This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f5c0d16c680 Update `it` module. (#29232)
f5c0d16c680 is described below

commit f5c0d16c6800f766c8ccc75219b341c76cf87da1
Author: Pranav Bhandari <bhandari.prana...@gmail.com>
AuthorDate: Tue Oct 31 19:04:15 2023 -0400

    Update `it` module. (#29232)
---
 .../cassandra/CassandraResourceManagerUtils.java   |   2 +-
 .../it/cassandra/matchers/CassandraAsserts.java    |   2 +-
 .../it/cassandra/CassandraResourceManagerTest.java |   3 +-
 .../java/org/apache/beam/it/gcp/LoadTestBase.java  |  10 +-
 .../beam/it/gcp/artifacts/utils/JsonTestUtil.java  | 149 +++++++++++
 .../it/gcp/dataflow/DefaultPipelineLauncher.java   |   6 +-
 .../beam/it/gcp/dataflow/DirectRunnerClient.java   |   6 +-
 .../beam/it/gcp/datagenerator/DataGenerator.java   |  54 ++--
 .../gcp/datastore/matchers/DatastoreAsserts.java   |   3 +-
 .../apache/beam/it/gcp/dlp/DlpResourceManager.java |   5 +-
 .../apache/beam/it/gcp/kms/KMSResourceManager.java |   5 +-
 .../beam/it/gcp/monitoring/MonitoringClient.java   |  12 +-
 .../it/gcp/spanner/matchers/SpannerAsserts.java    |  45 ++++
 .../apache/beam/it/gcp/bigtable/BigTableIOLT.java  |  18 +-
 .../apache/beam/it/gcp/spanner/SpannerIOLT.java    | 285 +++++++++++++++++++++
 .../apache/beam/it/gcp/storage/FileBasedIOLT.java  |   2 +-
 .../beam/it/jdbc/AbstractJDBCResourceManager.java  |  63 ++---
 .../apache/beam/it/jdbc/JDBCResourceManager.java   |  11 +-
 .../apache/beam/it/jdbc/MSSQLResourceManager.java  |   7 +-
 .../apache/beam/it/jdbc/MySQLResourceManager.java  |   4 +-
 .../apache/beam/it/jdbc/OracleResourceManager.java |   8 +-
 .../beam/it/jdbc/PostgresResourceManager.java      |  20 +-
 .../apache/beam/it/kafka/KafkaResourceManager.java |  11 +-
 .../beam/it/mongodb/MongoDBResourceManager.java    |  10 +-
 .../beam/it/mongodb/matchers/MongoDBAsserts.java   |   2 +-
 .../apache/beam/it/neo4j/Neo4jResourceManager.java |  11 +-
 .../beam/it/neo4j/conditions/Neo4jQueryCheck.java  |  15 +-
 .../beam/it/splunk/SplunkResourceManager.java      |  10 +-
 .../TestContainerResourceManager.java              |  15 +-
 .../beam/it/truthmatchers/RecordsSubject.java      |   2 +-
 30 files changed, 638 insertions(+), 158 deletions(-)

diff --git 
a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManagerUtils.java
 
b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManagerUtils.java
index ef617de518b..f0180076378 100644
--- 
a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManagerUtils.java
+++ 
b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManagerUtils.java
@@ -30,7 +30,7 @@ final class CassandraResourceManagerUtils {
       Pattern.compile("[/\\\\. \"\0$]"); // i.e. [/\. "$]
   private static final String REPLACE_DATABASE_NAME_CHAR = "-";
   private static final DateTimeFormatter TIME_FORMAT =
-      DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss");
+      DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss-SSSSSS");
 
   private CassandraResourceManagerUtils() {}
 
diff --git 
a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java
 
b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java
index 6aecc6609cf..61f730bf357 100644
--- 
a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java
+++ 
b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java
@@ -31,7 +31,7 @@ import org.apache.beam.it.truthmatchers.RecordsSubject;
 public class CassandraAsserts {
 
   /**
-   * Convert Cassandra {@link com.datastax.oss.driver.api.core.cql.Row} list 
to a list of maps.
+   * Convert Cassandra {@link Row} list to a list of maps.
    *
    * @param rows Rows to parse.
    * @return List of maps to use in {@link RecordsSubject}.
diff --git 
a/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java
 
b/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java
index fe00457159f..318ef6d76c6 100644
--- 
a/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java
+++ 
b/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java
@@ -72,7 +72,8 @@ public class CassandraResourceManagerTest {
 
   @Test
   public void testGetKeyspaceNameShouldReturnCorrectValue() {
-    assertThat(testManager.getKeyspaceName()).matches(TEST_ID.replace('-', 
'_') + "_\\d{8}_\\d{6}");
+    assertThat(testManager.getKeyspaceName())
+        .matches(TEST_ID.replace('-', '_') + "_\\d{8}_\\d{6}_\\d{6}");
   }
 
   @Test
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
index 14bb05394de..44a439b0ce9 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
@@ -516,20 +516,20 @@ public abstract class LoadTestBase {
 
     public abstract @Nullable String outputPCollectionV2();
 
-    public static MetricsConfiguration.Builder builder() {
+    public static Builder builder() {
       return new AutoValue_LoadTestBase_MetricsConfiguration.Builder();
     }
 
     @AutoValue.Builder
     public abstract static class Builder {
 
-      public abstract MetricsConfiguration.Builder 
setInputPCollection(@Nullable String value);
+      public abstract Builder setInputPCollection(@Nullable String value);
 
-      public abstract MetricsConfiguration.Builder 
setInputPCollectionV2(@Nullable String value);
+      public abstract Builder setInputPCollectionV2(@Nullable String value);
 
-      public abstract MetricsConfiguration.Builder 
setOutputPCollection(@Nullable String value);
+      public abstract Builder setOutputPCollection(@Nullable String value);
 
-      public abstract MetricsConfiguration.Builder 
setOutputPCollectionV2(@Nullable String value);
+      public abstract Builder setOutputPCollectionV2(@Nullable String value);
 
       public abstract MetricsConfiguration build();
     }
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/artifacts/utils/JsonTestUtil.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/artifacts/utils/JsonTestUtil.java
index 1ef12d33fa1..9a83558f7bf 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/artifacts/utils/JsonTestUtil.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/artifacts/utils/JsonTestUtil.java
@@ -18,13 +18,22 @@
 package org.apache.beam.it.gcp.artifacts.utils;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 /**
  * The {@link JsonTestUtil} class provides common utilities used for executing 
tests that involve
@@ -56,6 +65,67 @@ public class JsonTestUtil {
     return records;
   }
 
+  /**
+   * Reads NDJSON (Newline Delimited JSON) data from a byte array and returns 
a list of parsed JSON
+   * objects. Each JSON object is represented as a Map of String keys to 
Object values.
+   *
+   * @param jsonBytes A byte array containing NDJSON data.
+   * @return A list of parsed JSON objects as {@code Map<String, Object>}.
+   * @throws IOException if there's an issue reading or parsing the data.
+   */
+  public static List<Map<String, Object>> readNDJSON(byte[] jsonBytes) throws 
IOException {
+    try (ByteArrayInputStream inputStream = new 
ByteArrayInputStream(jsonBytes)) {
+      InputStreamReader reader = new InputStreamReader(inputStream, 
StandardCharsets.UTF_8);
+      JsonMapper mapper = new JsonMapper();
+
+      return new BufferedReader(reader)
+          .lines()
+          .map(
+              line -> {
+                try {
+                  // Deserialize each line as a Map<String, Object>
+                  return mapper.readValue(line, mapTypeRef);
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              })
+          .collect(Collectors.toList());
+    }
+  }
+
+  /**
+   * Recursively sorts the keys of a nested JSON represented as a Map.
+   *
+   * @param jsonMap A {@code Map<String, Object>} representing the nested JSON.
+   * @return A sorted {@code Map<String, Object>} where the keys are sorted in 
natural order.
+   */
+  public static Map<String, Object> sortJsonMap(Map<String, Object> jsonMap) {
+    return jsonMap.entrySet().stream()
+        .collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> {
+                  Object value = entry.getValue();
+                  if (value instanceof Map) {
+                    return sortJsonMap((Map<String, Object>) value);
+                  } else if (value instanceof List) {
+                    return ((List<Object>) value)
+                        .stream()
+                            .map(
+                                item ->
+                                    item instanceof Map
+                                        ? sortJsonMap((Map<String, Object>) 
item)
+                                        : item)
+                            .collect(Collectors.toList());
+                  } else {
+                    return value;
+                  }
+                },
+                (a, b) -> a, // Merge function (not needed for a TreeMap)
+                TreeMap::new // Resulting map is a TreeMap
+                ));
+  }
+
   /**
    * Read JSON records to a list of Maps.
    *
@@ -86,4 +156,83 @@ public class JsonTestUtil {
   public static Map<String, Object> readRecord(String contents) throws 
IOException {
     return readRecord(contents.getBytes(StandardCharsets.UTF_8));
   }
+
+  /**
+   * Parses a JSON string and returns either a List of Maps or a Map, 
depending on whether the JSON
+   * represents an array or an object.
+   *
+   * @param jsonString The JSON string to parse.
+   * @return A List of Maps if the JSON is an array, or a Map if it's an 
object.
+   * @throws IOException If there's an error while parsing the JSON string.
+   */
+  public static Object parseJsonString(String jsonString) throws IOException {
+    ObjectMapper objectMapper = new ObjectMapper();
+    JsonNode jsonNode = objectMapper.readTree(jsonString);
+    if (jsonNode.isArray()) {
+      return parseJsonArray((ArrayNode) jsonNode);
+    } else if (jsonNode.isObject()) {
+      return parseJsonObject(jsonNode);
+    } else {
+      throw new IllegalArgumentException("Input is not a valid JSON object or 
array.");
+    }
+  }
+
+  /**
+   * Parses a JSON array represented by an ArrayNode and returns a List of 
Maps.
+   *
+   * @param arrayNode The JSON array to parse.
+   * @return A List of Maps containing the parsed data.
+   */
+  private static List<Object> parseJsonArray(ArrayNode arrayNode) {
+    List<Object> result = new ArrayList<>();
+    for (JsonNode element : arrayNode) {
+      if (element.isObject()) {
+        result.add(parseJsonObject(element));
+      } else {
+        result.add(parseSimpleNode(element));
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Parses a JSON object represented by a JsonNode and returns a Map.
+   *
+   * @param objectNode The JSON object to parse.
+   * @return A Map containing the parsed data.
+   */
+  private static Map<String, Object> parseJsonObject(JsonNode objectNode) {
+    Map<String, Object> result = new HashMap<>();
+    objectNode
+        .fields()
+        .forEachRemaining(
+            entry -> {
+              String key = entry.getKey();
+              JsonNode value = entry.getValue();
+              if (value.isObject()) {
+                result.put(key, parseJsonObject(value));
+              } else if (value.isArray()) {
+                result.put(key, parseJsonArray((ArrayNode) value));
+              } else {
+                result.put(key, parseSimpleNode(value));
+              }
+            });
+    return result;
+  }
+
+  /** Parse following value from JSON node: text, number, boolean, null. */
+  @SuppressWarnings("nullness")
+  private static Object parseSimpleNode(JsonNode element) {
+    if (element.isTextual()) {
+      return element.asText();
+    } else if (element.isNumber()) {
+      return element.numberValue();
+    } else if (element.isBoolean()) {
+      return element.asBoolean();
+    } else if (element.isNull()) {
+      return null;
+    } else {
+      throw new IllegalArgumentException("Element is not a valid JSON object 
or array.");
+    }
+  }
 }
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
index 3d43618821a..620d24d4e11 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
@@ -99,7 +99,7 @@ public class DefaultPipelineLauncher extends 
AbstractPipelineLauncher {
           .put(PipelineResult.State.UNRECOGNIZED, JobState.UNKNOWN)
           .build();
 
-  private DefaultPipelineLauncher(DefaultPipelineLauncher.Builder builder) {
+  private DefaultPipelineLauncher(Builder builder) {
     super(
         new Dataflow(
             Utils.getDefaultTransport(),
@@ -109,8 +109,8 @@ public class DefaultPipelineLauncher extends 
AbstractPipelineLauncher {
                 : new HttpCredentialsAdapter(builder.getCredentials())));
   }
 
-  public static DefaultPipelineLauncher.Builder builder(Credentials 
credentials) {
-    return new DefaultPipelineLauncher.Builder(credentials);
+  public static Builder builder(Credentials credentials) {
+    return new Builder(credentials);
   }
 
   @Override
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DirectRunnerClient.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DirectRunnerClient.java
index 57f8ad40c1b..8017009ff37 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DirectRunnerClient.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DirectRunnerClient.java
@@ -53,8 +53,8 @@ public class DirectRunnerClient implements PipelineLauncher {
     this.mainClass = builder.getMainClass();
   }
 
-  public static DirectRunnerClient.Builder builder(Class<?> mainClass) {
-    return new DirectRunnerClient.Builder(mainClass);
+  public static Builder builder(Class<?> mainClass) {
+    return new Builder(mainClass);
   }
 
   @Override
@@ -172,7 +172,7 @@ public class DirectRunnerClient implements PipelineLauncher 
{
       return mainClass;
     }
 
-    public DirectRunnerClient.Builder setCredentials(Credentials value) {
+    public Builder setCredentials(Credentials value) {
       credentials = value;
       return this;
     }
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
index 832a75defd9..99016b5dd3a 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
@@ -61,16 +61,14 @@ public class DataGenerator {
             .build();
   }
 
-  public static DataGenerator.Builder builderWithSchemaLocation(
-      String testName, String schemaLocation) {
-    return new DataGenerator.Builder(testName + "-data-generator")
+  public static Builder builderWithSchemaLocation(String testName, String 
schemaLocation) {
+    return new Builder(testName + "-data-generator")
         .setSchemaLocation(schemaLocation)
         .setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
   }
 
-  public static DataGenerator.Builder builderWithSchemaTemplate(
-      String testName, String schemaTemplate) {
-    return new DataGenerator.Builder(testName + "-data-generator")
+  public static Builder builderWithSchemaTemplate(String testName, String 
schemaTemplate) {
+    return new Builder(testName + "-data-generator")
         .setSchemaTemplate(schemaTemplate)
         .setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
   }
@@ -131,27 +129,27 @@ public class DataGenerator {
       return parameters;
     }
 
-    public DataGenerator.Builder setSchemaTemplate(String value) {
+    public Builder setSchemaTemplate(String value) {
       parameters.put("schemaTemplate", value);
       return this;
     }
 
-    public DataGenerator.Builder setSchemaLocation(String value) {
+    public Builder setSchemaLocation(String value) {
       parameters.put("schemaLocation", value);
       return this;
     }
 
-    public DataGenerator.Builder setMessagesLimit(String value) {
+    public Builder setMessagesLimit(String value) {
       parameters.put(MESSAGES_LIMIT, value);
       return this;
     }
 
-    public DataGenerator.Builder setQPS(String value) {
+    public Builder setQPS(String value) {
       parameters.put("qps", value);
       return this;
     }
 
-    public DataGenerator.Builder setSinkType(String value) {
+    public Builder setSinkType(String value) {
       parameters.put("sinkType", value);
       return this;
     }
@@ -166,87 +164,87 @@ public class DataGenerator {
       return this;
     }
 
-    public DataGenerator.Builder setMaxNumWorkers(String value) {
+    public Builder setMaxNumWorkers(String value) {
       parameters.put("maxNumWorkers", value);
       return this;
     }
 
-    public DataGenerator.Builder 
setAutoscalingAlgorithm(AutoscalingAlgorithmType value) {
+    public Builder setAutoscalingAlgorithm(AutoscalingAlgorithmType value) {
       parameters.put("autoscalingAlgorithm", value.toString());
       return this;
     }
 
-    public DataGenerator.Builder setOutputDirectory(String value) {
+    public Builder setOutputDirectory(String value) {
       parameters.put("outputDirectory", value);
       return this;
     }
 
-    public DataGenerator.Builder setOutputType(String value) {
+    public Builder setOutputType(String value) {
       parameters.put("outputType", value);
       return this;
     }
 
-    public DataGenerator.Builder setNumShards(String value) {
+    public Builder setNumShards(String value) {
       parameters.put("numShards", value);
       return this;
     }
 
-    public DataGenerator.Builder setAvroSchemaLocation(String value) {
+    public Builder setAvroSchemaLocation(String value) {
       parameters.put("avroSchemaLocation", value);
       return this;
     }
 
-    public DataGenerator.Builder setTopic(String value) {
+    public Builder setTopic(String value) {
       parameters.put("topic", value);
       return this;
     }
 
-    public DataGenerator.Builder setProjectId(String value) {
+    public Builder setProjectId(String value) {
       parameters.put("projectId", value);
       return this;
     }
 
-    public DataGenerator.Builder setSpannerInstanceName(String value) {
+    public Builder setSpannerInstanceName(String value) {
       parameters.put("spannerInstanceName", value);
       return this;
     }
 
-    public DataGenerator.Builder setSpannerDatabaseName(String value) {
+    public Builder setSpannerDatabaseName(String value) {
       parameters.put("spannerDatabaseName", value);
       return this;
     }
 
-    public DataGenerator.Builder setSpannerTableName(String value) {
+    public Builder setSpannerTableName(String value) {
       parameters.put("spannerTableName", value);
       return this;
     }
 
-    public DataGenerator.Builder setDriverClassName(String value) {
+    public Builder setDriverClassName(String value) {
       parameters.put("driverClassName", value);
       return this;
     }
 
-    public DataGenerator.Builder setConnectionUrl(String value) {
+    public Builder setConnectionUrl(String value) {
       parameters.put("connectionUrl", value);
       return this;
     }
 
-    public DataGenerator.Builder setUsername(String value) {
+    public Builder setUsername(String value) {
       parameters.put("username", value);
       return this;
     }
 
-    public DataGenerator.Builder setPassword(String value) {
+    public Builder setPassword(String value) {
       parameters.put("password", value);
       return this;
     }
 
-    public DataGenerator.Builder setConnectionProperties(String value) {
+    public Builder setConnectionProperties(String value) {
       parameters.put("connectionProperties", value);
       return this;
     }
 
-    public DataGenerator.Builder setStatement(String value) {
+    public Builder setStatement(String value) {
       parameters.put("statement", value);
       return this;
     }
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/DatastoreAsserts.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/DatastoreAsserts.java
index 78fa7543150..ef67a5a5c4f 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/DatastoreAsserts.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/DatastoreAsserts.java
@@ -61,8 +61,7 @@ public class DatastoreAsserts {
   /**
    * Creates a {@link RecordsSubject} to assert information within a list of 
records.
    *
-   * @param results Records in Datastore {@link 
com.google.cloud.datastore.Entity} format to use in
-   *     the comparison.
+   * @param results Records in Datastore {@link Entity} format to use in the 
comparison.
    * @return Truth subject to chain assertions.
    */
   public static RecordsSubject assertThatDatastoreRecords(Collection<Entity> 
results) {
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dlp/DlpResourceManager.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dlp/DlpResourceManager.java
index de818a1bbff..f59794af3e1 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dlp/DlpResourceManager.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dlp/DlpResourceManager.java
@@ -113,9 +113,8 @@ public class DlpResourceManager implements ResourceManager {
    * @param project the GCP project ID
    * @return a new instance of Builder
    */
-  public static DlpResourceManager.Builder builder(
-      String project, CredentialsProvider credentialsProvider) {
-    return new DlpResourceManager.Builder(project, credentialsProvider);
+  public static Builder builder(String project, CredentialsProvider 
credentialsProvider) {
+    return new Builder(project, credentialsProvider);
   }
 
   /** A builder class for creating instances of {@link DlpResourceManager}. */
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
index 2cad6d0b9fa..7e1a403c735 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
@@ -72,9 +72,8 @@ public class KMSResourceManager implements ResourceManager {
     this.keyRing = null;
   }
 
-  public static KMSResourceManager.Builder builder(
-      String projectId, CredentialsProvider credentialsProvider) {
-    return new KMSResourceManager.Builder(projectId, credentialsProvider);
+  public static Builder builder(String projectId, CredentialsProvider 
credentialsProvider) {
+    return new Builder(projectId, credentialsProvider);
   }
 
   /**
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/monitoring/MonitoringClient.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/monitoring/MonitoringClient.java
index 06591ea4fe0..0fc5614a363 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/monitoring/MonitoringClient.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/monitoring/MonitoringClient.java
@@ -150,8 +150,8 @@ public final class MonitoringClient {
     Aggregation aggregation =
         Aggregation.newBuilder()
             .setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build())
-            .setPerSeriesAligner(Aggregation.Aligner.ALIGN_MEAN)
-            .setCrossSeriesReducer(Aggregation.Reducer.REDUCE_MEAN)
+            .setPerSeriesAligner(Aligner.ALIGN_MEAN)
+            .setCrossSeriesReducer(Reducer.REDUCE_MEAN)
             .addGroupByFields("resource.instance_id")
             .build();
     ListTimeSeriesRequest request =
@@ -188,7 +188,7 @@ public final class MonitoringClient {
     Aggregation aggregation =
         Aggregation.newBuilder()
             .setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build())
-            .setPerSeriesAligner(Aggregation.Aligner.ALIGN_MEAN)
+            .setPerSeriesAligner(Aligner.ALIGN_MEAN)
             .setCrossSeriesReducer(Reducer.REDUCE_MAX)
             .build();
     ListTimeSeriesRequest request =
@@ -225,7 +225,7 @@ public final class MonitoringClient {
     Aggregation aggregation =
         Aggregation.newBuilder()
             .setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build())
-            .setPerSeriesAligner(Aggregation.Aligner.ALIGN_MEAN)
+            .setPerSeriesAligner(Aligner.ALIGN_MEAN)
             .setCrossSeriesReducer(Reducer.REDUCE_MAX)
             .build();
     ListTimeSeriesRequest request =
@@ -269,7 +269,7 @@ public final class MonitoringClient {
     Aggregation aggregation =
         Aggregation.newBuilder()
             .setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build())
-            .setPerSeriesAligner(Aggregation.Aligner.ALIGN_RATE)
+            .setPerSeriesAligner(Aligner.ALIGN_RATE)
             .build();
     ListTimeSeriesRequest request =
         ListTimeSeriesRequest.newBuilder()
@@ -312,7 +312,7 @@ public final class MonitoringClient {
     Aggregation aggregation =
         Aggregation.newBuilder()
             .setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build())
-            .setPerSeriesAligner(Aggregation.Aligner.ALIGN_RATE)
+            .setPerSeriesAligner(Aligner.ALIGN_RATE)
             .build();
     ListTimeSeriesRequest request =
         ListTimeSeriesRequest.newBuilder()
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java
index c9964d16f3b..5a101e08d37 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java
@@ -17,13 +17,17 @@
  */
 package org.apache.beam.it.gcp.spanner.matchers;
 
+import static 
org.apache.beam.it.gcp.artifacts.utils.JsonTestUtil.parseJsonString;
 import static 
org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatRecords;
 
 import com.google.cloud.spanner.Mutation;
 import com.google.cloud.spanner.Struct;
 import com.google.cloud.spanner.Type;
+import com.google.cloud.spanner.Type.Code;
 import com.google.cloud.spanner.Value;
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -81,6 +85,47 @@ public class SpannerAsserts {
     }
   }
 
+  /**
+   * Convert a list of Spanner {@link Mutation} objects into a list of maps, 
extracting specified
+   * columns.
+   *
+   * @param mutations The list of mutations to process.
+   * @param columns The columns to extract.
+   * @return List of maps to use in {@link RecordsSubject}
+   */
+  public static List<Map<String, Object>> mutationsToRecords(
+      List<Mutation> mutations, List<String> columns) {
+    try {
+      List<Map<String, Object>> records = new ArrayList<>();
+      mutations.forEach(
+          entry -> {
+            records.add(
+                entry.asMap().entrySet().stream()
+                    .filter((e) -> columns.contains(e.getKey()))
+                    .collect(
+                        Collectors.toMap(
+                            Map.Entry::getKey,
+                            (e) -> {
+                              if (e.getValue().getType().getCode() == 
Code.ARRAY) {
+                                return e.getValue().getAsStringList();
+                              }
+                              if (Arrays.asList(Code.JSON, Code.PG_JSONB)
+                                  .contains(e.getValue().getType().getCode())) 
{
+                                try {
+                                  return 
parseJsonString(e.getValue().getJson());
+                                } catch (IOException ex) {
+                                  throw new RuntimeException(ex);
+                                }
+                              }
+                              return e.getValue().getAsString();
+                            })));
+          });
+      return records;
+    } catch (Exception e) {
+      throw new RuntimeException("Error converting TableResult to Records", e);
+    }
+  }
+
   /**
    * Creates a {@link RecordsSubject} to assert information within a list of 
records.
    *
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOLT.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOLT.java
index e232ed31cb5..a6516863b8d 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOLT.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOLT.java
@@ -59,7 +59,7 @@ import org.junit.Test;
  * -DfailIfNoTests=false".
  *
  * <p>Example trigger command for specific test: "mvn test -pl 
it/google-cloud-platform -am \
- * -Dtest="BigTableIOLT#testWriteAndRead" -Dconfiguration=local 
-Dproject=[gcpProject] \
+ * -Dtest="BigTableIOLT#testBigtableWriteAndRead" -Dconfiguration=local 
-Dproject=[gcpProject] \
  * -DartifactBucket=[temp bucket] -DfailIfNoTests=false".
  */
 public class BigTableIOLT extends IOLoadTestBase {
@@ -67,7 +67,7 @@ public class BigTableIOLT extends IOLoadTestBase {
   private static final String COLUMN_FAMILY_NAME = "cf";
   private static final long TABLE_MAX_AGE_MINUTES = 100L;
 
-  private static BigtableResourceManager resourceManager;
+  private BigtableResourceManager resourceManager;
   private static final String READ_ELEMENT_METRIC_NAME = "read_count";
   private Configuration configuration;
   private String tableId;
@@ -114,7 +114,7 @@ public class BigTableIOLT extends IOLoadTestBase {
 
   /** Run integration test with configurations specified by TestProperties. */
   @Test
-  public void testWriteAndRead() throws IOException {
+  public void testBigtableWriteAndRead() throws IOException {
 
     tableId = generateTableId(testName);
     resourceManager.createTable(
@@ -205,7 +205,7 @@ public class BigTableIOLT extends IOLoadTestBase {
     return pipelineLauncher.launch(project, region, options);
   }
 
-  /** Options for Bigquery IO load test. */
+  /** Options for BigtableIO load test. */
   @AutoValue
   abstract static class Configuration {
     abstract Long getNumRows();
@@ -227,18 +227,18 @@ public class BigTableIOLT extends IOLoadTestBase {
 
     @AutoValue.Builder
     abstract static class Builder {
-      abstract Configuration.Builder setNumRows(long numRows);
+      abstract Builder setNumRows(long numRows);
 
-      abstract Configuration.Builder setPipelineTimeout(int timeOutMinutes);
+      abstract Builder setPipelineTimeout(int timeOutMinutes);
 
-      abstract Configuration.Builder setRunner(String runner);
+      abstract Builder setRunner(String runner);
 
-      abstract Configuration.Builder setValueSizeBytes(int valueSizeBytes);
+      abstract Builder setValueSizeBytes(int valueSizeBytes);
 
       abstract Configuration build();
     }
 
-    abstract Configuration.Builder toBuilder();
+    abstract Builder toBuilder();
   }
 
   /** Maps long number to the BigTable format record. */
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerIOLT.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerIOLT.java
new file mode 100644
index 00000000000..949b863be3d
--- /dev/null
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerIOLT.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.gcp.spanner;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.cloud.ByteArray;
+import com.google.cloud.spanner.Mutation;
+import java.io.IOException;
+import java.io.Serializable;
+import java.text.ParseException;
+import java.time.Duration;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.UUID;
+import org.apache.beam.it.common.PipelineLauncher;
+import org.apache.beam.it.common.PipelineOperator;
+import org.apache.beam.it.common.TestProperties;
+import org.apache.beam.it.common.utils.ResourceManagerUtils;
+import org.apache.beam.it.gcp.IOLoadTestBase;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * SpannerIO performance tests.
+ *
+ * <p>Example trigger command for all tests: "mvn test -pl 
it/google-cloud-platform -am
+ * -Dtest=SpannerIOLT \ -Dproject=[gcpProject] -DartifactBucket=[temp bucket]
+ * -DfailIfNoTests=false".
+ *
+ * <p>Example trigger command for specific test: "mvn test -pl 
it/google-cloud-platform -am \
+ * -Dtest="SpannerIOLT#testSpannerWriteAndRead" -Dconfiguration=local 
-Dproject=[gcpProject] \
+ * -DartifactBucket=[temp bucket] -DfailIfNoTests=false".
+ */
+public class SpannerIOLT extends IOLoadTestBase {
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+  private String tableName;
+  private SpannerResourceManager resourceManager;
+  private Configuration configuration;
+  private static final String READ_ELEMENT_METRIC_NAME = "read_count";
+
+  @Before
+  public void setup() throws IOException {
+    // generate a random table name
+    tableName =
+        "io_spanner_"
+            + DateTimeFormatter.ofPattern("MMddHHmmssSSS")
+                .withZone(ZoneId.of("UTC"))
+                .format(java.time.Instant.now())
+            + UUID.randomUUID().toString().replace("-", "").substring(0, 10);
+
+    resourceManager = SpannerResourceManager.builder(testName, project, 
region).build();
+
+    // parse configuration
+    String testConfig =
+        TestProperties.getProperty("configuration", "local", 
TestProperties.Type.PROPERTY);
+    configuration = TEST_CONFIGS_PRESET.get(testConfig);
+    if (configuration == null) {
+      try {
+        configuration = Configuration.fromJsonString(testConfig, 
Configuration.class);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Unknown test configuration: [%s]. Pass to a valid 
configuration json, or use"
+                    + " config presets: %s",
+                testConfig, TEST_CONFIGS_PRESET.keySet()));
+      }
+    }
+    // prepare schema
+    String createTable =
+        createTableStatement(
+            tableName, configuration.numColumns, (int) 
configuration.valueSizeBytes);
+    // Create table
+    resourceManager.executeDdlStatement(createTable);
+  }
+
+  @After
+  public void teardown() {
+    ResourceManagerUtils.cleanResources(resourceManager);
+  }
+
+  private static final Map<String, Configuration> TEST_CONFIGS_PRESET;
+
+  static {
+    try {
+      TEST_CONFIGS_PRESET =
+          ImmutableMap.of(
+              "local",
+              Configuration.fromJsonString(
+                  
"{\"numRecords\":1000,\"valueSizeBytes\":1000,\"pipelineTimeout\":2,\"runner\":\"DirectRunner\"}",
+                  Configuration.class), // 1 MB
+              "medium",
+              Configuration.fromJsonString(
+                  
"{\"numRecords\":10000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":20,\"runner\":\"DataflowRunner\"}",
+                  Configuration.class), // 10 GB
+              "large",
+              Configuration.fromJsonString(
+                  
"{\"numRecords\":100000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":80,\"runner\":\"DataflowRunner\"}",
+                  Configuration.class) // 100 GB
+              );
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testSpannerWriteAndRead() throws IOException {
+    PipelineLauncher.LaunchInfo writeInfo = testWrite();
+    PipelineOperator.Result writeResult =
+        pipelineOperator.waitUntilDone(
+            createConfig(writeInfo, 
Duration.ofMinutes(configuration.pipelineTimeout)));
+    assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, writeResult);
+
+    PipelineLauncher.LaunchInfo readInfo = testRead();
+    PipelineOperator.Result result =
+        pipelineOperator.waitUntilDone(
+            createConfig(readInfo, 
Duration.ofMinutes(configuration.pipelineTimeout)));
+    assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, result);
+    assertEquals(
+        PipelineLauncher.JobState.DONE,
+        pipelineLauncher.getJobStatus(project, region, readInfo.jobId()));
+    double numRecords =
+        pipelineLauncher.getMetric(
+            project,
+            region,
+            readInfo.jobId(),
+            getBeamMetricsName(PipelineMetricsType.COUNTER, 
READ_ELEMENT_METRIC_NAME));
+    assertEquals(configuration.numRecords, numRecords, 0.5);
+
+    // export metrics
+    MetricsConfiguration metricsConfig =
+        MetricsConfiguration.builder()
+            .setInputPCollection("Map records.out0")
+            .setInputPCollectionV2("Map 
records/ParMultiDo(GenerateMutations).out0")
+            .setOutputPCollection("Counting element.out0")
+            .setOutputPCollectionV2("Counting 
element/ParMultiDo(Counting).out0")
+            .build();
+    try {
+      exportMetricsToBigQuery(writeInfo, getMetrics(writeInfo, metricsConfig));
+      exportMetricsToBigQuery(readInfo, getMetrics(readInfo, metricsConfig));
+    } catch (ParseException | InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private PipelineLauncher.LaunchInfo testWrite() throws IOException {
+    SpannerIO.Write writeTransform =
+        SpannerIO.write()
+            .withProjectId(project)
+            .withInstanceId(resourceManager.getInstanceId())
+            .withDatabaseId(resourceManager.getDatabaseId());
+
+    writePipeline
+        .apply(GenerateSequence.from(0).to(configuration.numRecords))
+        .apply(
+            "Map records",
+            ParDo.of(
+                new GenerateMutations(
+                    tableName, configuration.numColumns, (int) 
configuration.valueSizeBytes)))
+        .apply("Write to Spanner", writeTransform);
+
+    PipelineLauncher.LaunchConfig options =
+        PipelineLauncher.LaunchConfig.builder("write-spanner")
+            .setSdk(PipelineLauncher.Sdk.JAVA)
+            .setPipeline(writePipeline)
+            .addParameter("runner", configuration.runner)
+            .build();
+
+    return pipelineLauncher.launch(project, region, options);
+  }
+
+  private PipelineLauncher.LaunchInfo testRead() throws IOException {
+    SpannerIO.Read readTrabsfirn =
+        SpannerIO.read()
+            .withProjectId(project)
+            .withInstanceId(resourceManager.getInstanceId())
+            .withDatabaseId(resourceManager.getDatabaseId())
+            .withQuery(String.format("SELECT * FROM %s", tableName));
+
+    readPipeline
+        .apply("Read from Spanner", readTrabsfirn)
+        .apply("Counting element", ParDo.of(new 
CountingFn<>(READ_ELEMENT_METRIC_NAME)));
+
+    PipelineLauncher.LaunchConfig options =
+        PipelineLauncher.LaunchConfig.builder("read-spanner")
+            .setSdk(PipelineLauncher.Sdk.JAVA)
+            .setPipeline(readPipeline)
+            .addParameter("runner", configuration.runner)
+            .build();
+
+    return pipelineLauncher.launch(project, region, options);
+  }
+
+  /** Options for SpannerIO load test. */
+  static class Configuration extends SyntheticSourceOptions {
+
+    /**
+     * Number of columns (besides the primary key) of each record. The column 
size is equally
+     * distributed as valueSizeBytes/numColumns.
+     */
+    @JsonProperty public int numColumns = 1;
+
+    /** Pipeline timeout in minutes. Must be a positive value. */
+    @JsonProperty public int pipelineTimeout = 20;
+
+    /** Runner specified to run the pipeline. */
+    @JsonProperty public String runner = "DirectRunner";
+  }
+
+  /**
+   * Generate a create table sql statement with 1 integer column (Id) and 
additional numBytesCol
+   * columns.
+   */
+  static String createTableStatement(String tableId, int numBytesCol, int 
valueSizeBytes) {
+    int sizePerCol = valueSizeBytes / numBytesCol;
+    StringBuilder statement = new StringBuilder();
+    statement.append(String.format("CREATE TABLE %s (Id INT64", tableId));
+    for (int col = 0; col < numBytesCol; ++col) {
+      statement.append(String.format(",\n COL%d BYTES(%d)", col + 1, 
sizePerCol));
+    }
+    statement.append(") PRIMARY KEY(Id)");
+    return statement.toString();
+  }
+
+  /** Maps long number to the Spanner format record. */
+  private static class GenerateMutations extends DoFn<Long, Mutation> 
implements Serializable {
+    private final String table;
+    private final int numBytesCol;
+    private final int sizePerCol;
+
+    public GenerateMutations(String tableId, int numBytesCol, int 
valueSizeBytes) {
+      checkArgument(valueSizeBytes >= numBytesCol);
+      this.table = tableId;
+      this.numBytesCol = numBytesCol;
+      this.sizePerCol = valueSizeBytes / numBytesCol;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      Mutation.WriteBuilder builder = Mutation.newInsertOrUpdateBuilder(table);
+      Long key = Objects.requireNonNull(c.element());
+      builder.set("Id").to(key);
+      Random random = new Random(key);
+      byte[] value = new byte[sizePerCol];
+      for (int col = 0; col < numBytesCol; ++col) {
+        String name = String.format("COL%d", col + 1);
+        random.nextBytes(value);
+        builder.set(name).to(ByteArray.copyFrom(value));
+      }
+      Mutation mutation = builder.build();
+      c.output(mutation);
+    }
+  }
+}
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
index 704f8337c66..a36f3b340e8 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
@@ -90,7 +90,7 @@ public class FileBasedIOLT extends IOLoadTestBase {
 
   @Rule public TestPipeline readPipeline = TestPipeline.create();
 
-  private static final Map<String, FileBasedIOLT.Configuration> 
TEST_CONFIGS_PRESET;
+  private static final Map<String, Configuration> TEST_CONFIGS_PRESET;
 
   static {
     try {
diff --git 
a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/AbstractJDBCResourceManager.java
 
b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/AbstractJDBCResourceManager.java
index b57185b70eb..6d50dddb0cc 100644
--- 
a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/AbstractJDBCResourceManager.java
+++ 
b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/AbstractJDBCResourceManager.java
@@ -196,7 +196,8 @@ public abstract class AbstractJDBCResourceManager<T extends 
JdbcDatabaseContaine
             valueList.add(null);
           } else if (NumberUtils.isCreatable(value.toString())
               || "true".equalsIgnoreCase(value.toString())
-              || "false".equalsIgnoreCase(value.toString())) {
+              || "false".equalsIgnoreCase(value.toString())
+              || value.toString().startsWith("ARRAY[")) {
             valueList.add(String.valueOf(value));
           } else {
             valueList.add("'" + value + "'");
@@ -226,34 +227,9 @@ public abstract class AbstractJDBCResourceManager<T 
extends JdbcDatabaseContaine
   @SuppressWarnings("nullness")
   public List<Map<String, Object>> readTable(String tableName) {
     LOG.info("Reading all rows from {}.{}", databaseName, tableName);
-
-    List<Map<String, Object>> resultSet = new ArrayList<>();
-
-    StringBuilder sql = new StringBuilder();
-    try (Connection con = driver.getConnection(getUri(), username, password)) {
-      Statement stmt = con.createStatement();
-
-      sql.append("SELECT * FROM ").append(tableName);
-      ResultSet result = stmt.executeQuery(sql.toString());
-
-      while (result.next()) {
-        Map<String, Object> row = new HashMap<>();
-        ResultSetMetaData metadata = result.getMetaData();
-        // Columns list in table metadata is 1-indexed
-        for (int i = 1; i <= metadata.getColumnCount(); i++) {
-          row.put(metadata.getColumnName(i), result.getObject(i));
-        }
-        resultSet.add(row);
-      }
-      result.close();
-      stmt.close();
-    } catch (Exception e) {
-      throw new JDBCResourceManagerException(
-          "Failed to fetch rows from table. SQL statement: " + sql, e);
-    }
-
+    List<Map<String, Object>> result = runSQLQuery(String.format("SELECT * 
FROM %s", tableName));
     LOG.info("Successfully loaded rows from {}.{}", databaseName, tableName);
-    return resultSet;
+    return result;
   }
 
   @Override
@@ -290,9 +266,21 @@ public abstract class AbstractJDBCResourceManager<T 
extends JdbcDatabaseContaine
   }
 
   @Override
-  public synchronized ResultSet runSQLQuery(String sql) {
+  @SuppressWarnings("nullness")
+  public synchronized List<Map<String, Object>> runSQLQuery(String sql) {
     try (Statement stmt = driver.getConnection(getUri(), username, 
password).createStatement()) {
-      return stmt.executeQuery(sql);
+      List<Map<String, Object>> result = new ArrayList<>();
+      ResultSet resultSet = stmt.executeQuery(sql);
+      while (resultSet.next()) {
+        Map<String, Object> row = new HashMap<>();
+        ResultSetMetaData metadata = resultSet.getMetaData();
+        // Columns list in table metadata is 1-indexed
+        for (int i = 1; i <= metadata.getColumnCount(); i++) {
+          row.put(metadata.getColumnName(i), resultSet.getObject(i));
+        }
+        result.add(row);
+      }
+      return result;
     } catch (Exception e) {
       throw new JDBCResourceManagerException("Failed to execute SQL statement: 
" + sql, e);
     }
@@ -307,6 +295,21 @@ public abstract class AbstractJDBCResourceManager<T 
extends JdbcDatabaseContaine
     }
   }
 
+  @Override
+  public synchronized long getRowCount(String tableName) {
+    try (Connection con = driver.getConnection(getUri(), username, password)) {
+      Statement stmt = con.createStatement();
+      ResultSet resultSet = stmt.executeQuery(String.format("SELECT count(*) 
FROM %s", tableName));
+      resultSet.next();
+      long rows = resultSet.getLong(1);
+      resultSet.close();
+      stmt.close();
+      return rows;
+    } catch (Exception e) {
+      throw new JDBCResourceManagerException("Failed to get row count from " + 
tableName, e);
+    }
+  }
+
   /**
    * Builder for {@link AbstractJDBCResourceManager}.
    *
diff --git 
a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/JDBCResourceManager.java 
b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/JDBCResourceManager.java
index 9292d4cb42e..deb29ff3a5e 100644
--- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/JDBCResourceManager.java
+++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/JDBCResourceManager.java
@@ -19,7 +19,6 @@ package org.apache.beam.it.jdbc;
 
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 
-import java.sql.ResultSet;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.it.common.ResourceManager;
@@ -102,7 +101,7 @@ public interface JDBCResourceManager extends 
ResourceManager {
    * @param sql The SQL query to run.
    * @return A ResultSet containing the result of the execution.
    */
-  ResultSet runSQLQuery(String sql);
+  List<Map<String, Object>> runSQLQuery(String sql);
 
   /**
    * Run the given SQL DML statement (INSERT, UPDATE and DELETE).
@@ -111,6 +110,14 @@ public interface JDBCResourceManager extends 
ResourceManager {
    */
   void runSQLUpdate(String sql);
 
+  /**
+   * Gets the number of rows in table.
+   *
+   * @param tableName The name of the table.
+   * @return a count of number of rows in the table.
+   */
+  long getRowCount(String tableName);
+
   /** Object for managing JDBC table schemas in {@link JDBCResourceManager} 
instances. */
   class JDBCSchema {
 
diff --git 
a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MSSQLResourceManager.java 
b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MSSQLResourceManager.java
index c515b2c4844..0bcb16c6109 100644
--- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MSSQLResourceManager.java
+++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MSSQLResourceManager.java
@@ -61,14 +61,13 @@ public class MSSQLResourceManager
   }
 
   @VisibleForTesting
-  <T extends MSSQLResourceManager.DefaultMSSQLServerContainer<T>> 
MSSQLResourceManager(
-      T container, Builder builder) {
+  <T extends DefaultMSSQLServerContainer<T>> MSSQLResourceManager(T container, 
Builder builder) {
     super(container, builder);
     initialized = true;
   }
 
-  public static MSSQLResourceManager.Builder builder(String testId) {
-    return new MSSQLResourceManager.Builder(testId);
+  public static Builder builder(String testId) {
+    return new Builder(testId);
   }
 
   private synchronized void createDatabase(String databaseName) {
diff --git 
a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MySQLResourceManager.java 
b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MySQLResourceManager.java
index 688c26dfb56..e1bf3640b53 100644
--- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MySQLResourceManager.java
+++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MySQLResourceManager.java
@@ -49,8 +49,8 @@ public class MySQLResourceManager extends 
AbstractJDBCResourceManager<MySQLConta
     super(container, builder);
   }
 
-  public static MySQLResourceManager.Builder builder(String testId) {
-    return new MySQLResourceManager.Builder(testId);
+  public static Builder builder(String testId) {
+    return new Builder(testId);
   }
 
   @Override
diff --git 
a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/OracleResourceManager.java 
b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/OracleResourceManager.java
index 8054d26c33f..f44e939936d 100644
--- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/OracleResourceManager.java
+++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/OracleResourceManager.java
@@ -45,7 +45,7 @@ public class OracleResourceManager extends 
AbstractJDBCResourceManager<OracleCon
   private static final String DEFAULT_ORACLE_USERNAME = "testUser";
   private static final String DEFAULT_ORACLE_PASSWORD = "testPassword";
 
-  private OracleResourceManager(OracleResourceManager.Builder builder) {
+  private OracleResourceManager(Builder builder) {
     this(
         new OracleContainer(
             
DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)),
@@ -53,12 +53,12 @@ public class OracleResourceManager extends 
AbstractJDBCResourceManager<OracleCon
   }
 
   @VisibleForTesting
-  OracleResourceManager(OracleContainer container, 
OracleResourceManager.Builder builder) {
+  OracleResourceManager(OracleContainer container, Builder builder) {
     super(container, builder);
   }
 
-  public static OracleResourceManager.Builder builder(String testId) {
-    return new OracleResourceManager.Builder(testId);
+  public static Builder builder(String testId) {
+    return new Builder(testId);
   }
 
   @Override
diff --git 
a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/PostgresResourceManager.java 
b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/PostgresResourceManager.java
index 7f054dfbc5d..26bdff2305b 100644
--- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/PostgresResourceManager.java
+++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/PostgresResourceManager.java
@@ -38,21 +38,13 @@ public class PostgresResourceManager extends 
AbstractJDBCResourceManager<Postgre
   // https://hub.docker.com/_/postgres/tags?tab=tags
   private static final String DEFAULT_POSTGRES_CONTAINER_TAG = "15.1";
 
-  private PostgresResourceManager(PostgresResourceManager.Builder builder) {
-    this(
-        new PostgreSQLContainer<>(
-            
DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)),
-        builder);
-  }
-
   @VisibleForTesting
-  PostgresResourceManager(
-      PostgreSQLContainer<?> container, PostgresResourceManager.Builder 
builder) {
+  PostgresResourceManager(PostgreSQLContainer<?> container, Builder builder) {
     super(container, builder);
   }
 
-  public static PostgresResourceManager.Builder builder(String testId) {
-    return new PostgresResourceManager.Builder(testId);
+  public static Builder builder(String testId) {
+    return new Builder(testId);
   }
 
   @Override
@@ -80,7 +72,11 @@ public class PostgresResourceManager extends 
AbstractJDBCResourceManager<Postgre
 
     @Override
     public PostgresResourceManager build() {
-      return new PostgresResourceManager(this);
+      PostgreSQLContainer<?> container =
+          new PostgreSQLContainer<>(
+              
DockerImageName.parse(containerImageName).withTag(containerImageTag));
+      container.setCommand("postgres", "-c", "fsync=off", "-c", 
"max_connections=1000");
+      return new PostgresResourceManager(container, this);
     }
   }
 }
diff --git 
a/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java 
b/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java
index 7f7fb5b6956..d9a647dbeeb 100644
--- a/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java
+++ b/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java
@@ -71,16 +71,13 @@ public class KafkaResourceManager extends 
TestContainerResourceManager<GenericCo
   private final String connectionString;
   private final boolean usingStaticTopic;
 
-  private KafkaResourceManager(KafkaResourceManager.Builder builder) {
+  private KafkaResourceManager(Builder builder) {
     this(null, new DefaultKafkaContainer(builder), builder);
   }
 
   @VisibleForTesting
   @SuppressWarnings("nullness")
-  KafkaResourceManager(
-      @Nullable AdminClient client,
-      KafkaContainer container,
-      KafkaResourceManager.Builder builder) {
+  KafkaResourceManager(@Nullable AdminClient client, KafkaContainer container, 
Builder builder) {
     super(container, builder);
 
     this.usingStaticTopic = builder.topicNames.size() > 0;
@@ -105,8 +102,8 @@ public class KafkaResourceManager extends 
TestContainerResourceManager<GenericCo
             : AdminClient.create(ImmutableMap.of("bootstrap.servers", 
this.connectionString));
   }
 
-  public static KafkaResourceManager.Builder builder(String testId) {
-    return new KafkaResourceManager.Builder(testId);
+  public static Builder builder(String testId) {
+    return new Builder(testId);
   }
 
   /** Returns the kafka bootstrap server connection string. */
diff --git 
a/it/mongodb/src/main/java/org/apache/beam/it/mongodb/MongoDBResourceManager.java
 
b/it/mongodb/src/main/java/org/apache/beam/it/mongodb/MongoDBResourceManager.java
index 80216b14ac0..ed0e556bf0d 100644
--- 
a/it/mongodb/src/main/java/org/apache/beam/it/mongodb/MongoDBResourceManager.java
+++ 
b/it/mongodb/src/main/java/org/apache/beam/it/mongodb/MongoDBResourceManager.java
@@ -69,7 +69,7 @@ public class MongoDBResourceManager extends 
TestContainerResourceManager<MongoDB
   private final String connectionString;
   private final boolean usingStaticDatabase;
 
-  private MongoDBResourceManager(MongoDBResourceManager.Builder builder) {
+  private MongoDBResourceManager(Builder builder) {
     this(
         /* mongoClient= */ null,
         new MongoDBContainer(
@@ -80,9 +80,7 @@ public class MongoDBResourceManager extends 
TestContainerResourceManager<MongoDB
   @VisibleForTesting
   @SuppressWarnings("nullness")
   MongoDBResourceManager(
-      @Nullable MongoClient mongoClient,
-      MongoDBContainer container,
-      MongoDBResourceManager.Builder builder) {
+      @Nullable MongoClient mongoClient, MongoDBContainer container, Builder 
builder) {
     super(container, builder);
 
     this.usingStaticDatabase = builder.databaseName != null;
@@ -93,8 +91,8 @@ public class MongoDBResourceManager extends 
TestContainerResourceManager<MongoDB
     this.mongoClient = mongoClient == null ? 
MongoClients.create(connectionString) : mongoClient;
   }
 
-  public static MongoDBResourceManager.Builder builder(String testId) {
-    return new MongoDBResourceManager.Builder(testId);
+  public static Builder builder(String testId) {
+    return new Builder(testId);
   }
 
   /** Returns the URI connection string to the MongoDB Database. */
diff --git 
a/it/mongodb/src/main/java/org/apache/beam/it/mongodb/matchers/MongoDBAsserts.java
 
b/it/mongodb/src/main/java/org/apache/beam/it/mongodb/matchers/MongoDBAsserts.java
index 1a1b86acf56..ec08854b5c6 100644
--- 
a/it/mongodb/src/main/java/org/apache/beam/it/mongodb/matchers/MongoDBAsserts.java
+++ 
b/it/mongodb/src/main/java/org/apache/beam/it/mongodb/matchers/MongoDBAsserts.java
@@ -31,7 +31,7 @@ import org.bson.Document;
 public class MongoDBAsserts {
 
   /**
-   * Convert MongoDB {@link org.bson.Document} to a list of maps.
+   * Convert MongoDB {@link Document} to a list of maps.
    *
    * @param documents List of Documents to parse
    * @return List of maps to use in {@link RecordsSubject}
diff --git 
a/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java 
b/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java
index 97bcca9e84b..835be71ce0f 100644
--- a/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java
+++ b/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java
@@ -66,7 +66,7 @@ public class Neo4jResourceManager extends 
TestContainerResourceManager<Neo4jCont
 
   private final String adminPassword;
 
-  private Neo4jResourceManager(Neo4jResourceManager.Builder builder) {
+  private Neo4jResourceManager(Builder builder) {
     this(
         builder.driver,
         new Neo4jContainer<>(
@@ -79,10 +79,7 @@ public class Neo4jResourceManager extends 
TestContainerResourceManager<Neo4jCont
 
   @VisibleForTesting
   @SuppressWarnings("nullness")
-  Neo4jResourceManager(
-      @Nullable Driver neo4jDriver,
-      Neo4jContainer<?> container,
-      Neo4jResourceManager.Builder builder) {
+  Neo4jResourceManager(@Nullable Driver neo4jDriver, Neo4jContainer<?> 
container, Builder builder) {
     super(container, builder);
 
     this.adminPassword = builder.adminPassword;
@@ -101,8 +98,8 @@ public class Neo4jResourceManager extends 
TestContainerResourceManager<Neo4jCont
     }
   }
 
-  public static Neo4jResourceManager.Builder builder(String testId) {
-    return new Neo4jResourceManager.Builder(testId);
+  public static Builder builder(String testId) {
+    return new Builder(testId);
   }
 
   /** Returns the URI connection string to the Neo4j Database. */
diff --git 
a/it/neo4j/src/main/java/org/apache/beam/it/neo4j/conditions/Neo4jQueryCheck.java
 
b/it/neo4j/src/main/java/org/apache/beam/it/neo4j/conditions/Neo4jQueryCheck.java
index 32e283edb72..16f25133842 100644
--- 
a/it/neo4j/src/main/java/org/apache/beam/it/neo4j/conditions/Neo4jQueryCheck.java
+++ 
b/it/neo4j/src/main/java/org/apache/beam/it/neo4j/conditions/Neo4jQueryCheck.java
@@ -20,6 +20,9 @@ package org.apache.beam.it.neo4j.conditions;
 import com.google.auto.value.AutoValue;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.beam.it.conditions.ConditionCheck;
 import org.apache.beam.it.neo4j.Neo4jResourceManager;
@@ -55,9 +58,17 @@ public abstract class Neo4jQueryCheck extends ConditionCheck 
{
     if (actualResult == null) {
       return new CheckResult(expectedResult == null);
     }
+
+    Set<Map<String, Object>> sortedActualResult = sort(actualResult);
+    Set<Map<String, Object>> sortedExpectedResult = sort(expectedResult);
+
     return new CheckResult(
-        actualResult.equals(expectedResult),
-        String.format("Expected %s to equal %s", actualResult, 
expectedResult));
+        sortedActualResult.equals(sortedExpectedResult),
+        String.format("Expected %s to equal %s", sortedActualResult, 
sortedExpectedResult));
+  }
+
+  private static Set<Map<String, Object>> sort(List<Map<String, Object>> list) 
{
+    return list.stream().map(TreeMap::new).collect(Collectors.toSet());
   }
 
   public static Builder builder(Neo4jResourceManager resourceManager) {
diff --git 
a/it/splunk/src/main/java/org/apache/beam/it/splunk/SplunkResourceManager.java 
b/it/splunk/src/main/java/org/apache/beam/it/splunk/SplunkResourceManager.java
index 1ef4726df43..0115a791eef 100644
--- 
a/it/splunk/src/main/java/org/apache/beam/it/splunk/SplunkResourceManager.java
+++ 
b/it/splunk/src/main/java/org/apache/beam/it/splunk/SplunkResourceManager.java
@@ -85,7 +85,7 @@ public class SplunkResourceManager extends 
TestContainerResourceManager<SplunkCo
   private final SplunkClientFactory clientFactory;
 
   @SuppressWarnings("resource")
-  private SplunkResourceManager(SplunkResourceManager.Builder builder) {
+  private SplunkResourceManager(Builder builder) {
     this(
         new SplunkClientFactory(),
         new SplunkContainer(
@@ -98,9 +98,7 @@ public class SplunkResourceManager extends 
TestContainerResourceManager<SplunkCo
   @VisibleForTesting
   @SuppressWarnings("nullness")
   SplunkResourceManager(
-      SplunkClientFactory clientFactory,
-      SplunkContainer container,
-      SplunkResourceManager.Builder builder) {
+      SplunkClientFactory clientFactory, SplunkContainer container, Builder 
builder) {
     super(setup(container, builder), builder);
 
     String username = DEFAULT_SPLUNK_USERNAME;
@@ -169,8 +167,8 @@ public class SplunkResourceManager extends 
TestContainerResourceManager<SplunkCo
         .withPassword(builder.password);
   }
 
-  public static SplunkResourceManager.Builder builder(String testId) {
-    return new SplunkResourceManager.Builder(testId);
+  public static Builder builder(String testId) {
+    return new Builder(testId);
   }
 
   /**
diff --git 
a/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
 
b/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
index 77dd6da5805..098938a291d 100644
--- 
a/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
+++ 
b/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
@@ -32,11 +32,11 @@ import org.testcontainers.containers.GenericContainer;
  * resources.
  *
  * <p>Optionally, a static resource can be specified by calling the 
useStaticContainer() method in
- * the {@link TestContainerResourceManager.Builder} class. A static resource 
is a pre-configured
- * database or other resource that is ready to be connected to by the resource 
manager. This could
- * be a pre-existing TestContainer that has not been closed, a local database 
instance, a remote VM,
- * or any other source that can be connected to. If a static container is 
used, the host and port
- * must also be configured using the Builder's setHost() and setPort() 
methods, respectively.
+ * the {@link Builder} class. A static resource is a pre-configured database 
or other resource that
+ * is ready to be connected to by the resource manager. This could be a 
pre-existing TestContainer
+ * that has not been closed, a local database instance, a remote VM, or any 
other source that can be
+ * connected to. If a static container is used, the host and port must also be 
configured using the
+ * Builder's setHost() and setPort() methods, respectively.
  */
 public abstract class TestContainerResourceManager<T extends 
GenericContainer<?>>
     implements ResourceManager {
@@ -48,12 +48,11 @@ public abstract class TestContainerResourceManager<T 
extends GenericContainer<?>
   private final String host;
   protected int port;
 
-  protected <B extends TestContainerResourceManager.Builder<?>> 
TestContainerResourceManager(
-      T container, B builder) {
+  protected <B extends Builder<?>> TestContainerResourceManager(T container, B 
builder) {
     this(container, builder, null);
   }
 
-  protected <B extends TestContainerResourceManager.Builder<?>> 
TestContainerResourceManager(
+  protected <B extends Builder<?>> TestContainerResourceManager(
       T container, B builder, @Nullable Callable<Void> setup) {
     this.container = container;
     this.usingStaticContainer = builder.useStaticContainer;
diff --git 
a/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/RecordsSubject.java
 
b/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/RecordsSubject.java
index 39a0c0cebed..75d5ce3a67c 100644
--- 
a/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/RecordsSubject.java
+++ 
b/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/RecordsSubject.java
@@ -81,7 +81,7 @@ public class RecordsSubject extends Subject {
     Map<String, Object> expected = convertMapToTreeMap(subset);
     for (Map<String, Object> candidate : actual) {
       boolean match = true;
-      for (Map.Entry<String, Object> entry : subset.entrySet()) {
+      for (Entry<String, Object> entry : subset.entrySet()) {
         if (!candidate.containsKey(entry.getKey())
             || !candidate.get(entry.getKey()).equals(entry.getValue())) {
           match = false;

Reply via email to