This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cfd9662aebf Update `it` module. (#28113)
cfd9662aebf is described below
commit cfd9662aebfb87497b117f870941ea47c589bbc9
Author: Pranav Bhandari <[email protected]>
AuthorDate: Wed Aug 23 13:16:18 2023 -0400
Update `it` module. (#28113)
---
.../it/cassandra/CassandraResourceManager.java | 28 +++++++----
.../it/cassandra/CassandraResourceManagerTest.java | 3 ++
.../ElasticsearchResourceManager.java | 27 +++++++++--
.../ElasticsearchResourceManagerTest.java | 3 ++
it/google-cloud-platform/build.gradle | 13 +++---
.../org/apache/beam/it/gcp/IOLoadTestBase.java | 2 +-
.../java/org/apache/beam/it/gcp/LoadTestBase.java | 6 +--
.../beam/it/gcp/artifacts/ArtifactClient.java | 8 ++++
.../it/gcp/bigtable/BigtableResourceManager.java | 54 ++++++++++++++++++----
.../it/gcp/dataflow/DefaultPipelineLauncher.java | 13 ++----
.../beam/it/gcp/datagenerator/DataGenerator.java | 4 +-
.../beam/it/gcp/datagenerator/package-info.java | 1 +
.../it/gcp/datastore/matchers/package-info.java | 1 +
.../apache/beam/it/gcp/kms/KMSResourceManager.java | 4 +-
.../beam/it/gcp/pubsub/PubsubResourceManager.java | 2 +
.../beam/it/gcp/storage/GcsResourceManager.java | 8 +++-
.../apache/beam/it/gcp/bigquery/BigQueryIOLT.java | 6 +--
.../gcp/bigquery/BigQueryResourceManagerTest.java | 3 +-
.../gcp/bigtable/BigtableResourceManagerTest.java | 47 ++++++++++---------
.../it/gcp/dataflow/ClassicTemplateClientTest.java | 2 +-
.../gcp/dataflow/DefaultPipelineLauncherTest.java | 2 +-
.../it/gcp/dataflow/FlexTemplateClientTest.java | 2 +-
.../gcp/datastore/DatastoreResourceManagerIT.java | 7 ++-
.../datastream/DatastreamResourceManagerTest.java | 3 +-
.../beam/it/gcp/kms/KMSResourceManagerIT.java | 3 +-
.../beam/it/gcp/kms/KMSResourceManagerTest.java | 3 +-
.../it/gcp/spanner/SpannerResourceManagerTest.java | 18 --------
.../apache/beam/it/gcp/storage/FileBasedIOLT.java | 2 +-
.../beam/it/jdbc/MSSQLResourceManagerTest.java | 2 +
.../beam/it/jdbc/MySQLResourceManagerTest.java | 2 +
.../beam/it/jdbc/OracleResourceManagerTest.java | 2 +
.../beam/it/jdbc/PostgresResourceManagerTest.java | 2 +
.../apache/beam/it/kafka/KafkaResourceManager.java | 3 +-
.../java/org/apache/beam/it/kafka/KafkaIOLT.java | 2 +-
.../beam/it/kafka/KafkaResourceManagerTest.java | 2 +
.../it/mongodb/MongoDBResourceManagerTest.java | 2 +
.../beam/it/neo4j/Neo4jResourceManagerTest.java | 2 +
.../beam/it/splunk/conditions/package-info.java | 22 +++++----
.../beam/it/splunk/matchers/package-info.java | 22 +++++----
.../org/apache/beam/it/splunk/package-info.java | 22 +++++----
.../beam/it/splunk/SplunkResourceManagerTest.java | 2 +
.../it/splunk/SplunkResourceManagerUtilsTest.java | 2 +-
.../TestContainerResourceManager.java | 7 ++-
.../TestContainerResourceManagerTest.java | 11 +++++
44 files changed, 240 insertions(+), 142 deletions(-)
diff --git
a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManager.java
b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManager.java
index dc55709fff0..f7bf15eb764 100644
---
a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManager.java
+++
b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManager.java
@@ -34,6 +34,7 @@ import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.it.common.ResourceManager;
+import org.apache.beam.it.common.utils.ExceptionUtils;
import org.apache.beam.it.testcontainers.TestContainerResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,12 +99,7 @@ public class CassandraResourceManager extends
TestContainerResourceManager<Gener
if (!usingStaticDatabase) {
// Keyspace request may timeout on a few environments, if Cassandra is
warming up
- Failsafe.with(
- RetryPolicy.builder()
- .withMaxRetries(5)
- .withDelay(Duration.ofSeconds(1))
- .handle(DriverTimeoutException.class)
- .build())
+ Failsafe.with(buildRetryPolicy())
.run(
() ->
this.cassandraClient.execute(
@@ -141,8 +137,11 @@ public class CassandraResourceManager extends
TestContainerResourceManager<Gener
LOG.info("Executing statement: {}", statement);
try {
- return cassandraClient.execute(
-
SimpleStatement.newInstance(statement).setKeyspace(this.keyspaceName));
+ return Failsafe.with(buildRetryPolicy())
+ .get(
+ () ->
+ cassandraClient.execute(
+
SimpleStatement.newInstance(statement).setKeyspace(this.keyspaceName)));
} catch (Exception e) {
throw new CassandraResourceManagerException("Error reading collection.",
e);
}
@@ -226,8 +225,9 @@ public class CassandraResourceManager extends
TestContainerResourceManager<Gener
} catch (Exception e) {
LOG.error("Failed to drop Cassandra keyspace {}.", keyspaceName, e);
- // Only bubble exception if the cause is not timeout, as it will be
dropped with container.
- if (e.getCause() == null || !(e.getCause() instanceof
DriverTimeoutException)) {
+ // Only bubble exception if the cause is not timeout or does not exist
+ if (!ExceptionUtils.containsType(e, DriverTimeoutException.class)
+ && !ExceptionUtils.containsMessage(e, "does not exist")) {
producedError = true;
}
}
@@ -277,6 +277,14 @@ public class CassandraResourceManager extends
TestContainerResourceManager<Gener
return String.format("INSERT INTO %s (%s) VALUES (%s)", tableName,
columns, values);
}
+ private static RetryPolicy<Object> buildRetryPolicy() {
+ return RetryPolicy.builder()
+ .withMaxRetries(5)
+ .withDelay(Duration.ofSeconds(1))
+ .handle(DriverTimeoutException.class)
+ .build();
+ }
+
/** Builder for {@link CassandraResourceManager}. */
public static final class Builder
extends TestContainerResourceManager.Builder<CassandraResourceManager> {
diff --git
a/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java
b/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java
index ef5e8433434..fe00457159f 100644
---
a/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java
+++
b/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.it.cassandra;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -57,6 +58,8 @@ public class CassandraResourceManagerTest {
@Before
public void setUp() {
+ doReturn(container).when(container).withLogConsumer(any());
+
testManager =
new CassandraResourceManager(
cassandraClient, container,
CassandraResourceManager.builder(TEST_ID));
diff --git
a/it/elasticsearch/src/main/java/org/apache/beam/it/elasticsearch/ElasticsearchResourceManager.java
b/it/elasticsearch/src/main/java/org/apache/beam/it/elasticsearch/ElasticsearchResourceManager.java
index 775f0dd3105..c58875d7df8 100644
---
a/it/elasticsearch/src/main/java/org/apache/beam/it/elasticsearch/ElasticsearchResourceManager.java
+++
b/it/elasticsearch/src/main/java/org/apache/beam/it/elasticsearch/ElasticsearchResourceManager.java
@@ -20,6 +20,7 @@ package org.apache.beam.it.elasticsearch;
import static
org.apache.beam.it.elasticsearch.ElasticsearchUtils.checkValidIndexName;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -48,6 +49,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.DockerImageName;
@@ -81,11 +83,28 @@ public class ElasticsearchResourceManager extends
TestContainerResourceManager<G
private final List<String> managedIndexNames = new ArrayList<>();
private ElasticsearchResourceManager(Builder builder) {
- this(
- /* elasticsearchClient= */ null,
+ this(/* elasticsearchClient= */ null, buildContainer(builder), builder);
+ }
+
+ /**
+ * Create the {@link ElasticsearchContainer} instance for the given builder.
+ *
+ * <p>The method override the wait strategy from the base container using
the same regex, but
+ * increase the timeout to 2 minutes.
+ */
+ private static ElasticsearchContainer buildContainer(Builder builder) {
+ ElasticsearchContainer container =
new ElasticsearchContainer(
-
DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)),
- builder);
+
DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag));
+
+ //
+ // The regex is based on Elasticsearch container, but it's not exposed
anywhere.
+ String regex = ".*(\"message\":\\s?\"started[\\s?|\"].*|] started\n$)";
+ Duration startupTimeout = Duration.ofMinutes(2);
+ container.setWaitStrategy(
+ new
LogMessageWaitStrategy().withRegEx(regex).withStartupTimeout(startupTimeout));
+
+ return container;
}
@VisibleForTesting
diff --git
a/it/elasticsearch/src/test/java/org/apache/beam/it/elasticsearch/ElasticsearchResourceManagerTest.java
b/it/elasticsearch/src/test/java/org/apache/beam/it/elasticsearch/ElasticsearchResourceManagerTest.java
index 2b233c1793a..5778f24e466 100644
---
a/it/elasticsearch/src/test/java/org/apache/beam/it/elasticsearch/ElasticsearchResourceManagerTest.java
+++
b/it/elasticsearch/src/test/java/org/apache/beam/it/elasticsearch/ElasticsearchResourceManagerTest.java
@@ -21,6 +21,7 @@ import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -66,6 +67,8 @@ public class ElasticsearchResourceManagerTest {
@Before
public void setUp() {
when(container.getHttpHostAddress()).thenReturn(HOST + ":" + MAPPED_PORT);
+ doReturn(container).when(container).withLogConsumer(any());
+
testManager =
new ElasticsearchResourceManager(
elasticsearchClient, container,
ElasticsearchResourceManager.builder(TEST_ID));
diff --git a/it/google-cloud-platform/build.gradle
b/it/google-cloud-platform/build.gradle
index 320310049b9..f43b3f25720 100644
--- a/it/google-cloud-platform/build.gradle
+++ b/it/google-cloud-platform/build.gradle
@@ -44,26 +44,27 @@ dependencies {
implementation library.java.protobuf_java_util
implementation library.java.protobuf_java
implementation library.java.threetenbp
+ implementation 'org.awaitility:awaitility:4.2.0'
// Google Cloud Dependencies
implementation library.java.google_api_services_bigquery
implementation library.java.google_cloud_core
implementation 'com.google.cloud:google-cloud-storage'
implementation 'com.google.cloud:google-cloud-bigquery'
implementation 'com.google.cloud:google-cloud-monitoring'
- implementation 'com.google.api.grpc:proto-google-cloud-monitoring-v3'
+ provided 'com.google.api.grpc:proto-google-cloud-monitoring-v3'
implementation 'com.google.cloud:google-cloud-bigtable'
implementation 'com.google.cloud:google-cloud-spanner'
implementation 'com.google.cloud:google-cloud-pubsub'
- implementation 'com.google.api.grpc:proto-google-cloud-pubsub-v1'
+ provided 'com.google.api.grpc:proto-google-cloud-pubsub-v1'
implementation 'com.google.cloud:google-cloud-pubsublite'
- implementation 'com.google.api.grpc:proto-google-cloud-pubsublite-v1'
+ provided 'com.google.api.grpc:proto-google-cloud-pubsublite-v1'
implementation 'com.google.cloud:google-cloud-datastore'
implementation 'com.google.cloud:google-cloud-datastream'
- implementation 'com.google.api.grpc:proto-google-cloud-datastream-v1'
+ provided 'com.google.api.grpc:proto-google-cloud-datastream-v1'
implementation 'com.google.cloud:google-cloud-kms'
- implementation 'com.google.api.grpc:proto-google-cloud-kms-v1'
+ provided 'com.google.api.grpc:proto-google-cloud-kms-v1'
implementation 'com.google.cloud:google-cloud-dlp'
- implementation 'com.google.api.grpc:proto-google-cloud-dlp-v2'
+ provided 'com.google.api.grpc:proto-google-cloud-dlp-v2'
implementation 'com.google.cloud:google-cloud-secretmanager'
provided 'com.google.api.grpc:proto-google-cloud-secretmanager-v1'
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java
index b22a0234fc6..32f262f2eac 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java
@@ -63,7 +63,7 @@ public class IOLoadTestBase extends LoadTestBase {
@Override
PipelineLauncher launcher() {
- return DefaultPipelineLauncher.builder().build();
+ return DefaultPipelineLauncher.builder(CREDENTIALS).build();
}
/** A utility DoFn that counts elements passed through. */
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
index 139a485f90e..f6e359fed96 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
@@ -123,7 +123,7 @@ public abstract class LoadTestBase {
monitoringClient.cleanupAll();
}
- abstract PipelineLauncher launcher() throws IOException;
+ abstract PipelineLauncher launcher();
/**
* Exports the metrics of given dataflow job to BigQuery.
@@ -131,8 +131,7 @@ public abstract class LoadTestBase {
* @param launchInfo Job info of the job
* @param metrics metrics to export
*/
- protected void exportMetricsToBigQuery(LaunchInfo launchInfo, Map<String,
Double> metrics)
- throws IOException {
+ protected void exportMetricsToBigQuery(LaunchInfo launchInfo, Map<String,
Double> metrics) {
LOG.info("Exporting metrics:\n{}", formatForLogging(metrics));
try {
// either use the user specified project for exporting, or the same
project
@@ -140,7 +139,6 @@ public abstract class LoadTestBase {
BigQueryResourceManager bigQueryResourceManager =
BigQueryResourceManager.builder(testName, exportProject, CREDENTIALS)
.setDatasetId(TestProperties.exportDataset())
- .setCredentials(CREDENTIALS)
.build();
// exporting metrics to bigQuery table
Map<String, Object> rowContent = new HashMap<>();
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/artifacts/ArtifactClient.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/artifacts/ArtifactClient.java
index 201461f5db8..e463baffeaf 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/artifacts/ArtifactClient.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/artifacts/ArtifactClient.java
@@ -53,6 +53,14 @@ public interface ArtifactClient extends ResourceManager {
/** Returns the id associated with the particular run of the test class. */
String runId();
+ /**
+ * Returns a path the artifact will be created at.
+ *
+ * @param artifactName Artifact name
+ * @return GCS path where the artifact will be created
+ */
+ String getPathForArtifact(String artifactName);
+
/**
* Creates a new artifact in whatever service is being used to store them.
*
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
index c2adea1e0dc..1e6750cc81e 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
@@ -21,6 +21,7 @@ import static
org.apache.beam.it.common.utils.ResourceManagerUtils.checkValidPro
import static
org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.checkValidTableId;
import static
org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateDefaultClusters;
import static
org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateInstanceId;
+import static org.awaitility.Awaitility.await;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.ServerStream;
@@ -37,6 +38,8 @@ import
com.google.cloud.bigtable.admin.v2.models.CreateInstanceRequest;
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
import com.google.cloud.bigtable.admin.v2.models.GCRules;
import com.google.cloud.bigtable.admin.v2.models.StorageType;
+import com.google.cloud.bigtable.admin.v2.models.Table;
+import com.google.cloud.bigtable.admin.v2.models.UpdateTableRequest;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.Query;
@@ -49,6 +52,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.it.common.ResourceManager;
import org.apache.commons.lang3.StringUtils;
@@ -72,7 +77,7 @@ import org.threeten.bp.Duration;
public class BigtableResourceManager implements ResourceManager {
private static final Logger LOG =
LoggerFactory.getLogger(BigtableResourceManager.class);
- private static final String DEFAULT_CLUSTER_ZONE = "us-central1-a";
+ private static final String DEFAULT_CLUSTER_ZONE = "us-central1-b";
private static final int DEFAULT_CLUSTER_NUM_NODES = 1;
private static final StorageType DEFAULT_CLUSTER_STORAGE_TYPE =
StorageType.SSD;
@@ -84,6 +89,8 @@ public class BigtableResourceManager implements
ResourceManager {
private final List<String> createdTables;
// List to store created app profiles for static RM
private final List<String> createdAppProfiles;
+ // List of tables we enabled CDC for
+ private final Set<String> cdcEnabledTables;
private boolean hasInstance;
private final boolean usingStaticInstance;
@@ -103,6 +110,7 @@ public class BigtableResourceManager implements
ResourceManager {
this.projectId = builder.projectId;
this.createdTables = new ArrayList<>();
this.createdAppProfiles = new ArrayList<>();
+ this.cdcEnabledTables = new HashSet<>();
// Check if RM was configured to use static Bigtable instance.
if (builder.useStaticInstance) {
@@ -191,8 +199,10 @@ public class BigtableResourceManager implements
ResourceManager {
// Check to see if instance already exists, and throw error if it does
if (hasInstance) {
- throw new IllegalStateException(
- "Instance " + instanceId + " already exists for project " +
projectId + ".");
+ LOG.warn(
+ "Skipping instance creation. Instance was already created or static
instance was passed. Reusing : {}.",
+ instanceId);
+ return;
}
LOG.info("Creating instance {} in project {}.", instanceId, projectId);
@@ -341,8 +351,22 @@ public class BigtableResourceManager implements
ResourceManager {
createTableRequest.addFamily(
columnFamily,
GCRules.GCRULES.maxAge(bigtableTableSpec.getMaxAge()));
}
- // TODO: Set CDC enabled
+ if (bigtableTableSpec.getCdcEnabled()) {
+ createTableRequest.addChangeStreamRetention(Duration.ofDays(7));
+ cdcEnabledTables.add(tableId);
+ }
tableAdminClient.createTable(createTableRequest);
+
+ await("Waiting for all tables to be replicated.")
+ .atMost(java.time.Duration.ofMinutes(10))
+ .pollInterval(java.time.Duration.ofSeconds(5))
+ .until(
+ () -> {
+ Table t = tableAdminClient.getTable(tableId);
+ Map<String, Table.ReplicationState> rs =
t.getReplicationStatesByClusterId();
+ return
rs.values().stream().allMatch(Table.ReplicationState.READY::equals);
+ });
+
} else {
throw new IllegalStateException(
"Table " + tableId + " already exists for instance " + instanceId
+ ".");
@@ -534,12 +558,26 @@ public class BigtableResourceManager implements
ResourceManager {
public synchronized void cleanupAll() throws
BigtableResourceManagerException {
LOG.info("Attempting to cleanup manager.");
- if (usingStaticInstance) {
- try (BigtableTableAdminClient tableAdminClient =
- bigtableResourceManagerClientFactory.bigtableTableAdminClient()) {
- createdTables.forEach(tableAdminClient::deleteTable);
+ try (BigtableTableAdminClient tableAdminClient =
+ bigtableResourceManagerClientFactory.bigtableTableAdminClient()) {
+ // Change streams must be disabled before table or instance can be
deleted
+ for (String tableId : cdcEnabledTables) {
+
tableAdminClient.updateTable(UpdateTableRequest.of(tableId).disableChangeStreamRetention());
+ }
+
+ if (usingStaticInstance) {
LOG.info(
"This manager was configured to use a static instance that will
not be cleaned up.");
+
+ // Remove managed tables
+ createdTables.forEach(tableAdminClient::deleteTable);
+
+ // Remove managed app profiles
+ try (BigtableInstanceAdminClient instanceAdminClient =
+
bigtableResourceManagerClientFactory.bigtableInstanceAdminClient()) {
+ createdAppProfiles.forEach(
+ profile -> instanceAdminClient.deleteAppProfile(instanceId,
profile, true));
+ }
return;
}
}
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
index 9da7f67d4d7..7918dd6227d 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
@@ -109,8 +109,8 @@ public class DefaultPipelineLauncher extends
AbstractPipelineLauncher {
: new HttpCredentialsAdapter(builder.getCredentials())));
}
- public static Builder builder() {
- return new Builder();
+ public static Builder builder(Credentials credentials) {
+ return new Builder(credentials);
}
@Override
@@ -462,17 +462,14 @@ public class DefaultPipelineLauncher extends
AbstractPipelineLauncher {
public static final class Builder {
private Credentials credentials;
- private Builder() {}
+ private Builder(Credentials credentials) {
+ this.credentials = credentials;
+ }
public Credentials getCredentials() {
return credentials;
}
- public Builder setCredentials(Credentials value) {
- credentials = value;
- return this;
- }
-
public DefaultPipelineLauncher build() {
return new DefaultPipelineLauncher(this);
}
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
index 76d7f32286e..99016b5dd3a 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
@@ -51,7 +51,7 @@ public class DataGenerator {
private final PipelineLauncher pipelineLauncher;
private final PipelineOperator pipelineOperator;
- private DataGenerator(Builder builder) throws IOException {
+ private DataGenerator(Builder builder) {
pipelineLauncher = FlexTemplateClient.builder(CREDENTIALS).build();
pipelineOperator = new PipelineOperator(pipelineLauncher);
this.dataGeneratorOptions =
@@ -249,7 +249,7 @@ public class DataGenerator {
return this;
}
- public DataGenerator build() throws IOException {
+ public DataGenerator build() {
return new DataGenerator(this);
}
}
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/package-info.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/package-info.java
index 04e2dd4b73a..d563025ebeb 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/package-info.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/package-info.java
@@ -15,5 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
/** Data generator for load tests. */
package org.apache.beam.it.gcp.datagenerator;
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/package-info.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/package-info.java
index bcdf04716c5..e68f7b244d9 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/package-info.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/package-info.java
@@ -15,5 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
/** Package for Datastore Truth matchers / subjects to have reusable
assertions. */
package org.apache.beam.it.gcp.datastore.matchers;
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
index 6b6d7194f49..7e1a403c735 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
@@ -110,8 +110,8 @@ public class KMSResourceManager implements ResourceManager {
}
/**
- * Retrieves a KMS crypto key from GCS, creating it if it does not exist. If
the given keyring
- * also does not already exist, it will be created.
+ * Retrieves a KMS crypto key, creating it if it does not exist. If the
given keyring also does
+ * not already exist, it will be created.
*
* @param keyRingId The name of the keyring to insert the key to.
* @param keyName The name of the KMS crypto key to retrieve.
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
index 2d4d9a7d7de..3a684d34c04 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
@@ -28,6 +28,7 @@ import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.protobuf.ByteString;
+import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.PubsubMessage;
@@ -271,6 +272,7 @@ public final class PubsubResourceManager implements
ResourceManager {
createdSchemas.add(SchemaName.parse(schema.getName()));
topicAdminClient.updateTopic(
UpdateTopicRequest.newBuilder()
+ .setUpdateMask(FieldMask.newBuilder().addPaths("schema_settings"))
.setTopic(
Topic.newBuilder()
.setName(schemaTopic.toString())
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/storage/GcsResourceManager.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/storage/GcsResourceManager.java
index 1c09f5a6f71..730ca23ee54 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/storage/GcsResourceManager.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/storage/GcsResourceManager.java
@@ -84,8 +84,7 @@ public final class GcsResourceManager implements
ArtifactClient, ResourceManager
}
/** Returns a new {@link Builder} for configuring a client. */
- public static Builder builder(String bucket, String testClassName,
Credentials credentials)
- throws IOException {
+ public static Builder builder(String bucket, String testClassName,
Credentials credentials) {
checkArgument(!bucket.equals(""));
checkArgument(!testClassName.equals(""));
@@ -97,6 +96,11 @@ public final class GcsResourceManager implements
ArtifactClient, ResourceManager
return runId;
}
+ @Override
+ public String getPathForArtifact(String artifactName) {
+ return joinPathParts(testClassName, runId, artifactName);
+ }
+
@Override
public Artifact createArtifact(String artifactName, String contents) {
return this.createArtifact(artifactName,
contents.getBytes(StandardCharsets.UTF_8));
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java
index b179019a162..03f6e8abfd4 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java
@@ -109,11 +109,9 @@ public final class BigQueryIOLT extends IOLoadTestBase {
@Rule public TestPipeline readPipeline = TestPipeline.create();
@BeforeClass
- public static void beforeClass() throws IOException {
+ public static void beforeClass() {
resourceManager =
- BigQueryResourceManager.builder("io-bigquery-lt", project, CREDENTIALS)
- .setCredentials(CREDENTIALS)
- .build();
+ BigQueryResourceManager.builder("io-bigquery-lt", project,
CREDENTIALS).build();
resourceManager.createDataset(region);
}
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManagerTest.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManagerTest.java
index 53debe3ffa9..e9cd2523875 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManagerTest.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManagerTest.java
@@ -39,7 +39,6 @@ import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
-import java.io.IOException;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.Before;
@@ -85,7 +84,7 @@ public class BigQueryResourceManagerTest {
}
@Test
- public void testGetDatasetIdReturnsCorrectValue() throws IOException {
+ public void testGetDatasetIdReturnsCorrectValue() {
BigQueryResourceManager tm = BigQueryResourceManager.builder(TEST_ID,
PROJECT_ID, null).build();
assertThat(tm.getDatasetId()).matches(TEST_ID.replace('-', '_') +
"_\\d{8}_\\d{6}_\\d{6}");
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java
index a0f4d963c0b..65745aea49b 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java
@@ -33,11 +33,14 @@ import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.models.StorageType;
+import com.google.cloud.bigtable.admin.v2.models.Table.ReplicationState;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.junit.Before;
import org.junit.Rule;
@@ -95,13 +98,6 @@ public class BigtableResourceManagerTest {
assertThat(rm.getProjectId()).matches(PROJECT_ID);
}
- @Test
- public void
testCreateInstanceShouldThrowExceptionWhenInstanceAlreadyExists() {
- testManager.createInstance(cluster);
-
- assertThrows(IllegalStateException.class, () ->
testManager.createInstance(cluster));
- }
-
@Test
public void
testCreateInstanceShouldThrowExceptionWhenClientFailsToCreateInstance() {
when(bigtableResourceManagerClientFactory.bigtableInstanceAdminClient().createInstance(any()))
@@ -119,20 +115,6 @@ public class BigtableResourceManagerTest {
assertThrows(BigtableResourceManagerException.class, () ->
testManager.createInstance(cluster));
}
- @Test
- public void testCreateInstanceShouldThrowErrorWhenUsingStaticInstance()
throws IOException {
- String instanceId = "static-instance";
- testManager =
- new BigtableResourceManager(
- BigtableResourceManager.builder(TEST_ID, PROJECT_ID, null)
- .setInstanceId(instanceId)
- .useStaticInstance(),
- bigtableResourceManagerClientFactory);
-
- assertThrows(IllegalStateException.class, () ->
testManager.createInstance(cluster));
- assertThat(testManager.getInstanceId()).matches(instanceId);
- }
-
@Test
public void testCreateInstanceShouldWorkWhenBigtableDoesNotThrowAnyError() {
testManager.createInstance(cluster);
@@ -143,6 +125,8 @@ public class BigtableResourceManagerTest {
@Test
public void
testCreateTableShouldNotCreateInstanceWhenInstanceAlreadyExists() {
+ setupReadyTable();
+
testManager.createInstance(cluster);
Mockito.lenient()
.when(
@@ -195,6 +179,7 @@ public class BigtableResourceManagerTest {
@Test
public void
testCreateTableShouldThrowErrorWhenTableAdminClientFailsToClose() {
+ setupReadyTable();
BigtableTableAdminClient mockClient =
bigtableResourceManagerClientFactory.bigtableTableAdminClient();
doThrow(RuntimeException.class).when(mockClient).close();
@@ -206,6 +191,8 @@ public class BigtableResourceManagerTest {
@Test
public void testCreateTableShouldWorkWhenBigtableDoesNotThrowAnyError() {
+ setupReadyTable();
+
when(bigtableResourceManagerClientFactory.bigtableTableAdminClient().exists(anyString()))
.thenReturn(false);
@@ -427,17 +414,33 @@ public class BigtableResourceManagerTest {
when(bigtableResourceManagerClientFactory.bigtableTableAdminClient().exists(anyString()))
.thenReturn(false);
+ setupReadyTable();
+
testManager.createTable(TABLE_ID, ImmutableList.of("cf1"));
testManager.cleanupAll();
verify(bigtableResourceManagerClientFactory.bigtableTableAdminClient()).deleteTable(TABLE_ID);
verify(bigtableResourceManagerClientFactory.bigtableTableAdminClient(),
new Times(1))
.deleteTable(anyString());
- verify(bigtableResourceManagerClientFactory,
never()).bigtableInstanceAdminClient();
+ verify(bigtableResourceManagerClientFactory.bigtableInstanceAdminClient(),
never())
+ .deleteInstance(any());
+ }
+
+ private void setupReadyTable() {
+ Map<String, ReplicationState> allReplicated = new HashMap<>();
+ allReplicated.put(CLUSTER_ID, ReplicationState.READY);
+
+ when(bigtableResourceManagerClientFactory
+ .bigtableTableAdminClient()
+ .getTable(TABLE_ID)
+ .getReplicationStatesByClusterId())
+ .thenReturn(allReplicated);
}
@Test
public void testCleanupAllShouldWorkWhenBigtableDoesNotThrowAnyError() {
+ setupReadyTable();
+
testManager.createTable(TABLE_ID, ImmutableList.of("cf1"));
when(bigtableResourceManagerClientFactory.bigtableTableAdminClient().exists(anyString()))
.thenReturn(true);
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/ClassicTemplateClientTest.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/ClassicTemplateClientTest.java
index 41dc6763738..cfd56e596e5 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/ClassicTemplateClientTest.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/ClassicTemplateClientTest.java
@@ -79,7 +79,7 @@ public final class ClassicTemplateClientTest {
@Captor private ArgumentCaptor<CreateJobFromTemplateRequest> requestCaptor;
@Test
- public void testCreateWithCredentials() throws IOException {
+ public void testCreateWithCredentials() {
Credentials credentials = mock(Credentials.class);
ClassicTemplateClient.builder(credentials).build();
// Lack of exception is all we really can test
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncherTest.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncherTest.java
index 92d222fee30..b6c0f8cdc58 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncherTest.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncherTest.java
@@ -41,7 +41,7 @@ public class DefaultPipelineLauncherTest {
@Test
public void testPipelineMetrics() throws IOException {
- DefaultPipelineLauncher launcher =
DefaultPipelineLauncher.builder().build();
+ DefaultPipelineLauncher launcher =
DefaultPipelineLauncher.builder(null).build();
final String timeMetrics = "run_time";
final String counterMetrics = "counter";
final long numElements = 1000L;
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/FlexTemplateClientTest.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/FlexTemplateClientTest.java
index ae49972bd06..4088efe6751 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/FlexTemplateClientTest.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/FlexTemplateClientTest.java
@@ -81,7 +81,7 @@ public final class FlexTemplateClientTest {
@Captor private ArgumentCaptor<LaunchFlexTemplateRequest> requestCaptor;
@Test
- public void testCreateWithCredentials() throws IOException {
+ public void testCreateWithCredentials() {
Credentials credentials = mock(Credentials.class);
FlexTemplateClient.builder(credentials).build();
// Lack of exception is all we really can test
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/datastore/DatastoreResourceManagerIT.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/datastore/DatastoreResourceManagerIT.java
index ccb0ea0bafe..02bf43f3275 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/datastore/DatastoreResourceManagerIT.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/datastore/DatastoreResourceManagerIT.java
@@ -20,7 +20,6 @@ package org.apache.beam.it.gcp.datastore;
import static com.google.common.truth.Truth.assertThat;
import com.google.cloud.datastore.Entity;
-import java.io.IOException;
import java.util.List;
import org.apache.beam.it.common.TestProperties;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -30,7 +29,7 @@ import org.junit.Test;
public class DatastoreResourceManagerIT {
@Test
- public void testInsert() throws IOException {
+ public void testInsert() {
DatastoreResourceManager resourceManager =
DatastoreResourceManager.builder(
TestProperties.project(),
@@ -51,7 +50,7 @@ public class DatastoreResourceManagerIT {
}
@Test
- public void testInsertQuery() throws IOException {
+ public void testInsertQuery() {
DatastoreResourceManager resourceManager =
DatastoreResourceManager.builder(
TestProperties.project(),
@@ -75,7 +74,7 @@ public class DatastoreResourceManagerIT {
}
@Test
- public void testInsertCleanUp() throws IOException {
+ public void testInsertCleanUp() {
DatastoreResourceManager resourceManager =
DatastoreResourceManager.builder(
TestProperties.project(),
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/datastream/DatastreamResourceManagerTest.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/datastream/DatastreamResourceManagerTest.java
index cb9e9f8f9bc..d0afe8ae90f 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/datastream/DatastreamResourceManagerTest.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/datastream/DatastreamResourceManagerTest.java
@@ -36,7 +36,6 @@ import com.google.cloud.datastream.v1.SourceConfig;
import com.google.cloud.datastream.v1.Stream;
import com.google.cloud.datastream.v1.Stream.State;
import com.google.cloud.datastream.v1.UpdateStreamRequest;
-import java.io.IOException;
import java.util.concurrent.ExecutionException;
import org.junit.Before;
import org.junit.Rule;
@@ -66,7 +65,7 @@ public class DatastreamResourceManagerTest {
private DatastreamResourceManager testManager;
@Before
- public void setup() throws IOException {
+ public void setup() {
testManager =
new DatastreamResourceManager(
datastreamClient, DatastreamResourceManager.builder(PROJECT_ID,
LOCATION, null));
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/kms/KMSResourceManagerIT.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/kms/KMSResourceManagerIT.java
index 8cbb13b95f6..1dd0a5eda30 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/kms/KMSResourceManagerIT.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/kms/KMSResourceManagerIT.java
@@ -19,7 +19,6 @@ package org.apache.beam.it.gcp.kms;
import static com.google.common.truth.Truth.assertThat;
-import java.io.IOException;
import org.apache.beam.it.gcp.GCPBaseIT;
import org.apache.beam.it.gcp.GoogleCloudIntegrationTest;
import org.apache.commons.lang3.RandomStringUtils;
@@ -41,7 +40,7 @@ public class KMSResourceManagerIT extends GCPBaseIT {
private KMSResourceManager kmsResourceManager;
@Before
- public void setUp() throws IOException {
+ public void setUp() {
kmsResourceManager =
KMSResourceManager.builder(PROJECT,
credentialsProvider).setRegion(KMS_REGION).build();
}
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/kms/KMSResourceManagerTest.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/kms/KMSResourceManagerTest.java
index 8a0c98cfbf1..a386eb63fc6 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/kms/KMSResourceManagerTest.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/kms/KMSResourceManagerTest.java
@@ -35,7 +35,6 @@ import com.google.cloud.kms.v1.KeyRing;
import com.google.cloud.kms.v1.KeyRingName;
import com.google.cloud.kms.v1.LocationName;
import com.google.protobuf.ByteString;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -68,7 +67,7 @@ public class KMSResourceManagerTest {
private KMSResourceManager testManager;
@Before
- public void setUp() throws IOException {
+ public void setUp() {
testManager =
new KMSResourceManager(
kmsClientFactory, KMSResourceManager.builder(PROJECT_ID,
null).setRegion(REGION));
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerResourceManagerTest.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerResourceManagerTest.java
index ded5bda4de8..0d3aed34f3a 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerResourceManagerTest.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerResourceManagerTest.java
@@ -196,7 +196,6 @@ public final class SpannerResourceManagerTest {
// arrange
prepareTable();
when(spanner.getDatabaseClient(any()).write(any())).thenReturn(Timestamp.now());
- // spotless:off
Mutation testMutation =
Mutation.newInsertOrUpdateBuilder("SingerId")
.set("SingerId")
@@ -206,7 +205,6 @@ public final class SpannerResourceManagerTest {
.set("LastName")
.to("Richards")
.build();
- // spotless:on
// act
testManager.write(testMutation);
@@ -220,7 +218,6 @@ public final class SpannerResourceManagerTest {
@Test
public void
testWriteSingleRecordShouldThrowExceptionWhenCalledBeforeExecuteDdlStatement() {
// arrange
- // spotless:off
Mutation testMutation =
Mutation.newInsertOrUpdateBuilder("SingerId")
.set("SingerId")
@@ -230,7 +227,6 @@ public final class SpannerResourceManagerTest {
.set("LastName")
.to("Richards")
.build();
- // spotless:on
// act & assert
assertThrows(IllegalStateException.class, () ->
testManager.write(testMutation));
@@ -242,7 +238,6 @@ public final class SpannerResourceManagerTest {
// arrange
prepareTable();
when(spanner.getDatabaseClient(any()).write(any())).thenThrow(SpannerException.class);
- // spotless:off
Mutation testMutation =
Mutation.newInsertOrUpdateBuilder("SingerId")
.set("SingerId")
@@ -252,7 +247,6 @@ public final class SpannerResourceManagerTest {
.set("LastName")
.to("Richards")
.build();
- // spotless:on
// act & assert
assertThrows(SpannerResourceManagerException.class, () ->
testManager.write(testMutation));
@@ -264,7 +258,6 @@ public final class SpannerResourceManagerTest {
// arrange
prepareTable();
when(spanner.getDatabaseClient(any()).write(any())).thenReturn(Timestamp.now());
- // spotless:off
ImmutableList<Mutation> testMutations =
ImmutableList.of(
Mutation.newInsertOrUpdateBuilder("SingerId")
@@ -283,7 +276,6 @@ public final class SpannerResourceManagerTest {
.set("LastName")
.to("Smith")
.build());
- // spotless:on
// act
testManager.write(testMutations);
@@ -298,7 +290,6 @@ public final class SpannerResourceManagerTest {
@Test
public void
testWriteMultipleRecordsShouldThrowExceptionWhenCalledBeforeExecuteDdlStatement()
{
// arrange
- // spotless:off
ImmutableList<Mutation> testMutations =
ImmutableList.of(
Mutation.newInsertOrUpdateBuilder("SingerId")
@@ -317,7 +308,6 @@ public final class SpannerResourceManagerTest {
.set("LastName")
.to("Smith")
.build());
- // spotless:on
// act & assert
assertThrows(IllegalStateException.class, () ->
testManager.write(testMutations));
@@ -329,7 +319,6 @@ public final class SpannerResourceManagerTest {
// arrange
prepareTable();
when(spanner.getDatabaseClient(any()).write(any())).thenThrow(SpannerException.class);
- // spotless:off
ImmutableList<Mutation> testMutations =
ImmutableList.of(
Mutation.newInsertOrUpdateBuilder("SingerId")
@@ -348,7 +337,6 @@ public final class SpannerResourceManagerTest {
.set("LastName")
.to("Smith")
.build());
- // spotless:on
// act & assert
assertThrows(SpannerResourceManagerException.class, () ->
testManager.write(testMutations));
@@ -360,7 +348,6 @@ public final class SpannerResourceManagerTest {
// arrange
prepareTable();
when(resultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
- // spotless:off
Struct struct1 =
Struct.newBuilder()
.set("SingerId")
@@ -379,7 +366,6 @@ public final class SpannerResourceManagerTest {
.set("LastName")
.to(string("Smith"))
.build();
- // spotless:on
when(resultSet.getCurrentRowAsStruct()).thenReturn(struct1).thenReturn(struct2);
when(spanner.getDatabaseClient(any()).singleUse().read(any(), any(),
any()))
.thenReturn(resultSet);
@@ -399,7 +385,6 @@ public final class SpannerResourceManagerTest {
// arrange
prepareTable();
when(resultSet.next()).thenReturn(true).thenReturn(false);
- // spotless:off
Struct struct =
Struct.newBuilder()
.set("SingerId")
@@ -409,7 +394,6 @@ public final class SpannerResourceManagerTest {
.set("LastName")
.to(string("Richards"))
.build();
- // spotless:on
when(resultSet.getCurrentRowAsStruct()).thenReturn(struct);
when(spanner.getDatabaseClient(any()).singleUse().read(any(), any(),
any()))
.thenReturn(resultSet);
@@ -494,7 +478,6 @@ public final class SpannerResourceManagerTest {
+ " FirstName STRING(1024),\n"
+ " LastName STRING(1024),\n"
+ ") PRIMARY KEY (SingerId)";
- // spotless:off
Mutation testMutation =
Mutation.newInsertOrUpdateBuilder("SingerId")
.set("SingerId")
@@ -504,7 +487,6 @@ public final class SpannerResourceManagerTest {
.set("LastName")
.to("Richards")
.build();
- // spotless:on
ImmutableList<String> columnNames = ImmutableList.of("SingerId");
// act & assert
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
index 8de05ddf18c..fd1bc1772f2 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
@@ -117,7 +117,7 @@ public class FileBasedIOLT extends IOLoadTestBase {
}
@BeforeClass
- public static void beforeClass() throws IOException {
+ public static void beforeClass() {
resourceManager =
GcsResourceManager.builder(TestProperties.artifactBucket(),
"textiolt", CREDENTIALS)
.build();
diff --git
a/it/jdbc/src/test/java/org/apache/beam/it/jdbc/MSSQLResourceManagerTest.java
b/it/jdbc/src/test/java/org/apache/beam/it/jdbc/MSSQLResourceManagerTest.java
index 40c8fc32ae0..0ed192bf3c4 100644
---
a/it/jdbc/src/test/java/org/apache/beam/it/jdbc/MSSQLResourceManagerTest.java
+++
b/it/jdbc/src/test/java/org/apache/beam/it/jdbc/MSSQLResourceManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.it.jdbc;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
import org.junit.Before;
@@ -53,6 +54,7 @@ public class MSSQLResourceManagerTest<
when(container.withPassword(any())).thenReturn(container);
when(container.withDatabaseName(anyString())).thenReturn(container);
when(container.getDatabaseName()).thenReturn(DATABASE_NAME);
+ doReturn(container).when(container).withLogConsumer(any());
testManager = new MSSQLResourceManager(container, new
MSSQLResourceManager.Builder(TEST_ID));
}
diff --git
a/it/jdbc/src/test/java/org/apache/beam/it/jdbc/MySQLResourceManagerTest.java
b/it/jdbc/src/test/java/org/apache/beam/it/jdbc/MySQLResourceManagerTest.java
index 0f90dfccb71..402c45ec8ce 100644
---
a/it/jdbc/src/test/java/org/apache/beam/it/jdbc/MySQLResourceManagerTest.java
+++
b/it/jdbc/src/test/java/org/apache/beam/it/jdbc/MySQLResourceManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.it.jdbc;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
import org.junit.Before;
@@ -49,6 +50,7 @@ public class MySQLResourceManagerTest<T extends
MySQLContainer<T>> {
when(container.withUsername(any())).thenReturn(container);
when(container.withPassword(any())).thenReturn(container);
when(container.withDatabaseName(anyString())).thenReturn(container);
+ doReturn(container).when(container).withLogConsumer(any());
testManager = new MySQLResourceManager(container, new
MySQLResourceManager.Builder(TEST_ID));
}
diff --git
a/it/jdbc/src/test/java/org/apache/beam/it/jdbc/OracleResourceManagerTest.java
b/it/jdbc/src/test/java/org/apache/beam/it/jdbc/OracleResourceManagerTest.java
index 6bf7b71c311..a2da44d7849 100644
---
a/it/jdbc/src/test/java/org/apache/beam/it/jdbc/OracleResourceManagerTest.java
+++
b/it/jdbc/src/test/java/org/apache/beam/it/jdbc/OracleResourceManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.it.jdbc;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
import org.junit.Before;
@@ -50,6 +51,7 @@ public class OracleResourceManagerTest<T extends
OracleContainer> {
when(container.withUsername(any())).thenReturn(container);
when(container.withPassword(any())).thenReturn(container);
when(container.withDatabaseName(anyString())).thenReturn(container);
+ doReturn(container).when(container).withLogConsumer(any());
testManager = new OracleResourceManager(container, new
OracleResourceManager.Builder(TEST_ID));
}
diff --git
a/it/jdbc/src/test/java/org/apache/beam/it/jdbc/PostgresResourceManagerTest.java
b/it/jdbc/src/test/java/org/apache/beam/it/jdbc/PostgresResourceManagerTest.java
index 3d939758c2f..a8a29816a6e 100644
---
a/it/jdbc/src/test/java/org/apache/beam/it/jdbc/PostgresResourceManagerTest.java
+++
b/it/jdbc/src/test/java/org/apache/beam/it/jdbc/PostgresResourceManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.it.jdbc;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
import org.junit.Before;
@@ -49,6 +50,7 @@ public class PostgresResourceManagerTest<T extends
PostgreSQLContainer<T>> {
when(container.withUsername(any())).thenReturn(container);
when(container.withPassword(any())).thenReturn(container);
when(container.withDatabaseName(anyString())).thenReturn(container);
+ doReturn(container).when(container).withLogConsumer(any());
testManager =
new PostgresResourceManager(container, new
PostgresResourceManager.Builder(TEST_ID));
}
diff --git
a/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java
b/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java
index e32f73b70f6..d9a647dbeeb 100644
--- a/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java
+++ b/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java
@@ -19,7 +19,6 @@ package org.apache.beam.it.kafka;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -103,7 +102,7 @@ public class KafkaResourceManager extends
TestContainerResourceManager<GenericCo
: AdminClient.create(ImmutableMap.of("bootstrap.servers",
this.connectionString));
}
- public static Builder builder(String testId) throws IOException {
+ public static Builder builder(String testId) {
return new Builder(testId);
}
diff --git a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOLT.java
b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOLT.java
index 0fd50e6989a..a03030664de 100644
--- a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOLT.java
+++ b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOLT.java
@@ -69,7 +69,7 @@ public final class KafkaIOLT extends IOLoadTestBase {
@Rule public TestPipeline readPipeline = TestPipeline.create();
@BeforeClass
- public static void beforeClass() throws IOException {
+ public static void beforeClass() {
resourceManager = KafkaResourceManager.builder("io-kafka-lt").build();
}
diff --git
a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaResourceManagerTest.java
b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaResourceManagerTest.java
index d830e3d3e67..8c870815efc 100644
---
a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaResourceManagerTest.java
+++
b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaResourceManagerTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -65,6 +66,7 @@ public final class KafkaResourceManagerTest {
@Before
public void setUp() throws IOException {
+ doReturn(container).when(container).withLogConsumer(any());
testManager =
new KafkaResourceManager(kafkaClient, container,
KafkaResourceManager.builder(TEST_ID));
}
diff --git
a/it/mongodb/src/test/java/org/apache/beam/it/mongodb/MongoDBResourceManagerTest.java
b/it/mongodb/src/test/java/org/apache/beam/it/mongodb/MongoDBResourceManagerTest.java
index 6979c168de8..b3ad34b70ff 100644
---
a/it/mongodb/src/test/java/org/apache/beam/it/mongodb/MongoDBResourceManagerTest.java
+++
b/it/mongodb/src/test/java/org/apache/beam/it/mongodb/MongoDBResourceManagerTest.java
@@ -21,6 +21,7 @@ import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -65,6 +66,7 @@ public class MongoDBResourceManagerTest {
@Before
public void setUp() {
+ doReturn(container).when(container).withLogConsumer(any());
testManager =
new MongoDBResourceManager(mongoClient, container,
MongoDBResourceManager.builder(TEST_ID));
}
diff --git
a/it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerTest.java
b/it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerTest.java
index 26042eca023..5014505ece9 100644
---
a/it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerTest.java
+++
b/it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerTest.java
@@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.startsWith;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -68,6 +69,7 @@ public class Neo4jResourceManagerTest {
when(container.getMappedPort(NEO4J_BOLT_PORT)).thenReturn(MAPPED_PORT);
when(session.run(anyString(), anyMap())).thenReturn(result);
when(neo4jDriver.session(any())).thenReturn(session);
+ doReturn(container).when(container).withLogConsumer(any());
testManager =
new Neo4jResourceManager(neo4jDriver, container,
Neo4jResourceManager.builder(TEST_ID));
diff --git
a/it/splunk/src/main/java/org/apache/beam/it/splunk/conditions/package-info.java
b/it/splunk/src/main/java/org/apache/beam/it/splunk/conditions/package-info.java
index a291d65b7a6..d4ae6db311a 100644
---
a/it/splunk/src/main/java/org/apache/beam/it/splunk/conditions/package-info.java
+++
b/it/splunk/src/main/java/org/apache/beam/it/splunk/conditions/package-info.java
@@ -1,17 +1,19 @@
/*
- * Copyright (C) 2023 Google LLC
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
under
- * the License.
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
/** Package that contains reusable Splunk conditions. */
diff --git
a/it/splunk/src/main/java/org/apache/beam/it/splunk/matchers/package-info.java
b/it/splunk/src/main/java/org/apache/beam/it/splunk/matchers/package-info.java
index 92777c6ab7c..f56eff15599 100644
---
a/it/splunk/src/main/java/org/apache/beam/it/splunk/matchers/package-info.java
+++
b/it/splunk/src/main/java/org/apache/beam/it/splunk/matchers/package-info.java
@@ -1,17 +1,19 @@
/*
- * Copyright (C) 2023 Google LLC
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
under
- * the License.
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
/** Package for Splunk Truth matchers / subjects to have reusable assertions.
*/
diff --git
a/it/splunk/src/main/java/org/apache/beam/it/splunk/package-info.java
b/it/splunk/src/main/java/org/apache/beam/it/splunk/package-info.java
index 858500edc82..6097113cd70 100644
--- a/it/splunk/src/main/java/org/apache/beam/it/splunk/package-info.java
+++ b/it/splunk/src/main/java/org/apache/beam/it/splunk/package-info.java
@@ -1,17 +1,19 @@
/*
- * Copyright (C) 2023 Google LLC
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
under
- * the License.
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
/** Package for managing Splunk resources within integration tests. */
diff --git
a/it/splunk/src/test/java/org/apache/beam/it/splunk/SplunkResourceManagerTest.java
b/it/splunk/src/test/java/org/apache/beam/it/splunk/SplunkResourceManagerTest.java
index eeada3d0838..ed97bd9f367 100644
---
a/it/splunk/src/test/java/org/apache/beam/it/splunk/SplunkResourceManagerTest.java
+++
b/it/splunk/src/test/java/org/apache/beam/it/splunk/SplunkResourceManagerTest.java
@@ -21,6 +21,7 @@ import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -79,6 +80,7 @@ public class SplunkResourceManagerTest {
when(container.withPassword(anyString())).thenReturn(container);
when(container.getMappedPort(DEFAULT_SPLUNKD_INTERNAL_PORT))
.thenReturn(MAPPED_SPLUNKD_INTERNAL_PORT);
+ doReturn(container).when(container).withLogConsumer(any());
testManager =
new SplunkResourceManager(clientFactory, container,
SplunkResourceManager.builder(TEST_ID));
diff --git
a/it/splunk/src/test/java/org/apache/beam/it/splunk/SplunkResourceManagerUtilsTest.java
b/it/splunk/src/test/java/org/apache/beam/it/splunk/SplunkResourceManagerUtilsTest.java
index 2f27340b6ae..b4aaec3b45c 100644
---
a/it/splunk/src/test/java/org/apache/beam/it/splunk/SplunkResourceManagerUtilsTest.java
+++
b/it/splunk/src/test/java/org/apache/beam/it/splunk/SplunkResourceManagerUtilsTest.java
@@ -30,7 +30,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/** Unit tests for {@link SplunkResourceManager}. */
+/** Unit tests for {@link SplunkResourceManagerUtils}. */
@RunWith(JUnit4.class)
public class SplunkResourceManagerUtilsTest {
diff --git
a/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
b/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
index b0279303866..098938a291d 100644
---
a/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
+++
b/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
@@ -68,7 +68,12 @@ public abstract class TestContainerResourceManager<T extends
GenericContainer<?>
}
if (!usingStaticContainer) {
- container.start();
+ // TODO(pranavbhandari): Change this to use
log.getUtf8StringWithoutLineEnding() when
+ // testcontainers dependency is updated.
+ container
+ .withLogConsumer(
+ log -> LOG.info("{}: {}", container.getDockerImageName(),
log.getUtf8String()))
+ .start();
} else if (builder.host == null || builder.port < 0) {
throw new TestContainerResourceManagerException(
"This manager was configured to use a static resource, but the host
and port were not properly set.");
diff --git
a/it/testcontainers/src/test/java/org/apache/beam/it/testcontainers/TestContainerResourceManagerTest.java
b/it/testcontainers/src/test/java/org/apache/beam/it/testcontainers/TestContainerResourceManagerTest.java
index ad79d6ea848..7e2d686d28a 100644
---
a/it/testcontainers/src/test/java/org/apache/beam/it/testcontainers/TestContainerResourceManagerTest.java
+++
b/it/testcontainers/src/test/java/org/apache/beam/it/testcontainers/TestContainerResourceManagerTest.java
@@ -19,7 +19,9 @@ package org.apache.beam.it.testcontainers;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -54,6 +56,7 @@ public class TestContainerResourceManagerTest {
testManagerBuilder =
new
TestContainerResourceManager.Builder<TestContainerResourceManagerImpl>(
TEST_ID, null, null) {
+
@Override
public TestContainerResourceManagerImpl build() {
return new TestContainerResourceManagerImpl(container, this);
@@ -64,6 +67,7 @@ public class TestContainerResourceManagerTest {
@Test
public void testCreateResourceManagerSetsCorrectDockerImageName() {
when(container.getDockerImageName()).thenReturn("container-test:test");
+ doReturn(container).when(container).withLogConsumer(any());
testManagerBuilder.setContainerImageName("container-test").setContainerImageTag("test").build();
@@ -74,6 +78,7 @@ public class TestContainerResourceManagerTest {
@Test
public void
testCreateResourceManagerShouldStartContainerWhenNotUsingStaticResource() {
+ doReturn(container).when(container).withLogConsumer(any());
testManagerBuilder.build();
verify(container).start();
@@ -110,6 +115,7 @@ public class TestContainerResourceManagerTest {
@Test
public void testGetHostShouldReturnCorrectHostWhenManuallySet() {
+ doReturn(container).when(container).withLogConsumer(any());
TestContainerResourceManager<?> testManager =
testManagerBuilder.setHost(HOST).build();
assertThat(testManager.getHost()).matches(HOST);
@@ -117,6 +123,7 @@ public class TestContainerResourceManagerTest {
@Test
public void testGetHostShouldReturnCorrectHostWhenHostNotSet() {
+ doReturn(container).when(container).withLogConsumer(any());
String host = TestProperties.hostIp();
TestContainerResourceManager<?> testManager = testManagerBuilder.build();
@@ -125,6 +132,7 @@ public class TestContainerResourceManagerTest {
@Test
public void testGetPortShouldReturnCorrectPortWhenManuallySet() {
+ doReturn(container).when(container).withLogConsumer(any());
TestContainerResourceManager<?> testManager =
testManagerBuilder.setHost(HOST).setPort(PORT).build();
@@ -135,6 +143,7 @@ public class TestContainerResourceManagerTest {
public void testGetPortShouldReturnContainerHostWhenPortNotSet() {
int mappedPort = 5000;
when(container.getMappedPort(anyInt())).thenReturn(mappedPort);
+ doReturn(container).when(container).withLogConsumer(any());
TestContainerResourceManager<?> testManager = testManagerBuilder.build();
@@ -143,6 +152,7 @@ public class TestContainerResourceManagerTest {
@Test
public void testCleanupAllShouldCloseContainerWhenNotUsingStaticResource() {
+ doReturn(container).when(container).withLogConsumer(any());
TestContainerResourceManager<?> testManager = testManagerBuilder.build();
testManager.cleanupAll();
@@ -152,6 +162,7 @@ public class TestContainerResourceManagerTest {
@Test
public void testCleanupAllShouldReturnFalseWhenContainerFailsToClose() {
doThrow(RuntimeException.class).when(container).close();
+ doReturn(container).when(container).withLogConsumer(any());
TestContainerResourceManager<?> testManager = testManagerBuilder.build();