This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new f5c0d16c680 Update `it` module. (#29232) f5c0d16c680 is described below commit f5c0d16c6800f766c8ccc75219b341c76cf87da1 Author: Pranav Bhandari <bhandari.prana...@gmail.com> AuthorDate: Tue Oct 31 19:04:15 2023 -0400 Update `it` module. (#29232) --- .../cassandra/CassandraResourceManagerUtils.java | 2 +- .../it/cassandra/matchers/CassandraAsserts.java | 2 +- .../it/cassandra/CassandraResourceManagerTest.java | 3 +- .../java/org/apache/beam/it/gcp/LoadTestBase.java | 10 +- .../beam/it/gcp/artifacts/utils/JsonTestUtil.java | 149 +++++++++++ .../it/gcp/dataflow/DefaultPipelineLauncher.java | 6 +- .../beam/it/gcp/dataflow/DirectRunnerClient.java | 6 +- .../beam/it/gcp/datagenerator/DataGenerator.java | 54 ++-- .../gcp/datastore/matchers/DatastoreAsserts.java | 3 +- .../apache/beam/it/gcp/dlp/DlpResourceManager.java | 5 +- .../apache/beam/it/gcp/kms/KMSResourceManager.java | 5 +- .../beam/it/gcp/monitoring/MonitoringClient.java | 12 +- .../it/gcp/spanner/matchers/SpannerAsserts.java | 45 ++++ .../apache/beam/it/gcp/bigtable/BigTableIOLT.java | 18 +- .../apache/beam/it/gcp/spanner/SpannerIOLT.java | 285 +++++++++++++++++++++ .../apache/beam/it/gcp/storage/FileBasedIOLT.java | 2 +- .../beam/it/jdbc/AbstractJDBCResourceManager.java | 63 ++--- .../apache/beam/it/jdbc/JDBCResourceManager.java | 11 +- .../apache/beam/it/jdbc/MSSQLResourceManager.java | 7 +- .../apache/beam/it/jdbc/MySQLResourceManager.java | 4 +- .../apache/beam/it/jdbc/OracleResourceManager.java | 8 +- .../beam/it/jdbc/PostgresResourceManager.java | 20 +- .../apache/beam/it/kafka/KafkaResourceManager.java | 11 +- .../beam/it/mongodb/MongoDBResourceManager.java | 10 +- .../beam/it/mongodb/matchers/MongoDBAsserts.java | 2 +- .../apache/beam/it/neo4j/Neo4jResourceManager.java | 11 +- .../beam/it/neo4j/conditions/Neo4jQueryCheck.java | 15 +- .../beam/it/splunk/SplunkResourceManager.java | 10 +- .../TestContainerResourceManager.java | 15 +- .../beam/it/truthmatchers/RecordsSubject.java | 2 +- 30 files changed, 638 insertions(+), 158 deletions(-) diff --git a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManagerUtils.java b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManagerUtils.java index ef617de518b..f0180076378 100644 --- a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManagerUtils.java +++ b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManagerUtils.java @@ -30,7 +30,7 @@ final class CassandraResourceManagerUtils { Pattern.compile("[/\\\\. \"\0$]"); // i.e. [/\. "$] private static final String REPLACE_DATABASE_NAME_CHAR = "-"; private static final DateTimeFormatter TIME_FORMAT = - DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss"); + DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss-SSSSSS"); private CassandraResourceManagerUtils() {} diff --git a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java index 6aecc6609cf..61f730bf357 100644 --- a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java +++ b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java @@ -31,7 +31,7 @@ import org.apache.beam.it.truthmatchers.RecordsSubject; public class CassandraAsserts { /** - * Convert Cassandra {@link com.datastax.oss.driver.api.core.cql.Row} list to a list of maps. + * Convert Cassandra {@link Row} list to a list of maps. * * @param rows Rows to parse. * @return List of maps to use in {@link RecordsSubject}. diff --git a/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java b/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java index fe00457159f..318ef6d76c6 100644 --- a/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java +++ b/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java @@ -72,7 +72,8 @@ public class CassandraResourceManagerTest { @Test public void testGetKeyspaceNameShouldReturnCorrectValue() { - assertThat(testManager.getKeyspaceName()).matches(TEST_ID.replace('-', '_') + "_\\d{8}_\\d{6}"); + assertThat(testManager.getKeyspaceName()) + .matches(TEST_ID.replace('-', '_') + "_\\d{8}_\\d{6}_\\d{6}"); } @Test diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java index 14bb05394de..44a439b0ce9 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java @@ -516,20 +516,20 @@ public abstract class LoadTestBase { public abstract @Nullable String outputPCollectionV2(); - public static MetricsConfiguration.Builder builder() { + public static Builder builder() { return new AutoValue_LoadTestBase_MetricsConfiguration.Builder(); } @AutoValue.Builder public abstract static class Builder { - public abstract MetricsConfiguration.Builder setInputPCollection(@Nullable String value); + public abstract Builder setInputPCollection(@Nullable String value); - public abstract MetricsConfiguration.Builder setInputPCollectionV2(@Nullable String value); + public abstract Builder setInputPCollectionV2(@Nullable String value); - public abstract MetricsConfiguration.Builder setOutputPCollection(@Nullable String value); + public abstract Builder setOutputPCollection(@Nullable String value); - public abstract MetricsConfiguration.Builder setOutputPCollectionV2(@Nullable String value); + public abstract Builder setOutputPCollectionV2(@Nullable String value); public abstract MetricsConfiguration build(); } diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/artifacts/utils/JsonTestUtil.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/artifacts/utils/JsonTestUtil.java index 1ef12d33fa1..9a83558f7bf 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/artifacts/utils/JsonTestUtil.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/artifacts/utils/JsonTestUtil.java @@ -18,13 +18,22 @@ package org.apache.beam.it.gcp.artifacts.utils; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; /** * The {@link JsonTestUtil} class provides common utilities used for executing tests that involve @@ -56,6 +65,67 @@ public class JsonTestUtil { return records; } + /** + * Reads NDJSON (Newline Delimited JSON) data from a byte array and returns a list of parsed JSON + * objects. Each JSON object is represented as a Map of String keys to Object values. + * + * @param jsonBytes A byte array containing NDJSON data. + * @return A list of parsed JSON objects as {@code Map<String, Object>}. + * @throws IOException if there's an issue reading or parsing the data. + */ + public static List<Map<String, Object>> readNDJSON(byte[] jsonBytes) throws IOException { + try (ByteArrayInputStream inputStream = new ByteArrayInputStream(jsonBytes)) { + InputStreamReader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8); + JsonMapper mapper = new JsonMapper(); + + return new BufferedReader(reader) + .lines() + .map( + line -> { + try { + // Deserialize each line as a Map<String, Object> + return mapper.readValue(line, mapTypeRef); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + } + } + + /** + * Recursively sorts the keys of a nested JSON represented as a Map. + * + * @param jsonMap A {@code Map<String, Object>} representing the nested JSON. + * @return A sorted {@code Map<String, Object>} where the keys are sorted in natural order. + */ + public static Map<String, Object> sortJsonMap(Map<String, Object> jsonMap) { + return jsonMap.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> { + Object value = entry.getValue(); + if (value instanceof Map) { + return sortJsonMap((Map<String, Object>) value); + } else if (value instanceof List) { + return ((List<Object>) value) + .stream() + .map( + item -> + item instanceof Map + ? sortJsonMap((Map<String, Object>) item) + : item) + .collect(Collectors.toList()); + } else { + return value; + } + }, + (a, b) -> a, // Merge function (not needed for a TreeMap) + TreeMap::new // Resulting map is a TreeMap + )); + } + /** * Read JSON records to a list of Maps. * @@ -86,4 +156,83 @@ public class JsonTestUtil { public static Map<String, Object> readRecord(String contents) throws IOException { return readRecord(contents.getBytes(StandardCharsets.UTF_8)); } + + /** + * Parses a JSON string and returns either a List of Maps or a Map, depending on whether the JSON + * represents an array or an object. + * + * @param jsonString The JSON string to parse. + * @return A List of Maps if the JSON is an array, or a Map if it's an object. + * @throws IOException If there's an error while parsing the JSON string. + */ + public static Object parseJsonString(String jsonString) throws IOException { + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode jsonNode = objectMapper.readTree(jsonString); + if (jsonNode.isArray()) { + return parseJsonArray((ArrayNode) jsonNode); + } else if (jsonNode.isObject()) { + return parseJsonObject(jsonNode); + } else { + throw new IllegalArgumentException("Input is not a valid JSON object or array."); + } + } + + /** + * Parses a JSON array represented by an ArrayNode and returns a List of Maps. + * + * @param arrayNode The JSON array to parse. + * @return A List of Maps containing the parsed data. + */ + private static List<Object> parseJsonArray(ArrayNode arrayNode) { + List<Object> result = new ArrayList<>(); + for (JsonNode element : arrayNode) { + if (element.isObject()) { + result.add(parseJsonObject(element)); + } else { + result.add(parseSimpleNode(element)); + } + } + return result; + } + + /** + * Parses a JSON object represented by a JsonNode and returns a Map. + * + * @param objectNode The JSON object to parse. + * @return A Map containing the parsed data. + */ + private static Map<String, Object> parseJsonObject(JsonNode objectNode) { + Map<String, Object> result = new HashMap<>(); + objectNode + .fields() + .forEachRemaining( + entry -> { + String key = entry.getKey(); + JsonNode value = entry.getValue(); + if (value.isObject()) { + result.put(key, parseJsonObject(value)); + } else if (value.isArray()) { + result.put(key, parseJsonArray((ArrayNode) value)); + } else { + result.put(key, parseSimpleNode(value)); + } + }); + return result; + } + + /** Parse following value from JSON node: text, number, boolean, null. */ + @SuppressWarnings("nullness") + private static Object parseSimpleNode(JsonNode element) { + if (element.isTextual()) { + return element.asText(); + } else if (element.isNumber()) { + return element.numberValue(); + } else if (element.isBoolean()) { + return element.asBoolean(); + } else if (element.isNull()) { + return null; + } else { + throw new IllegalArgumentException("Element is not a valid JSON object or array."); + } + } } diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java index 3d43618821a..620d24d4e11 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java @@ -99,7 +99,7 @@ public class DefaultPipelineLauncher extends AbstractPipelineLauncher { .put(PipelineResult.State.UNRECOGNIZED, JobState.UNKNOWN) .build(); - private DefaultPipelineLauncher(DefaultPipelineLauncher.Builder builder) { + private DefaultPipelineLauncher(Builder builder) { super( new Dataflow( Utils.getDefaultTransport(), @@ -109,8 +109,8 @@ public class DefaultPipelineLauncher extends AbstractPipelineLauncher { : new HttpCredentialsAdapter(builder.getCredentials()))); } - public static DefaultPipelineLauncher.Builder builder(Credentials credentials) { - return new DefaultPipelineLauncher.Builder(credentials); + public static Builder builder(Credentials credentials) { + return new Builder(credentials); } @Override diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DirectRunnerClient.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DirectRunnerClient.java index 57f8ad40c1b..8017009ff37 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DirectRunnerClient.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DirectRunnerClient.java @@ -53,8 +53,8 @@ public class DirectRunnerClient implements PipelineLauncher { this.mainClass = builder.getMainClass(); } - public static DirectRunnerClient.Builder builder(Class<?> mainClass) { - return new DirectRunnerClient.Builder(mainClass); + public static Builder builder(Class<?> mainClass) { + return new Builder(mainClass); } @Override @@ -172,7 +172,7 @@ public class DirectRunnerClient implements PipelineLauncher { return mainClass; } - public DirectRunnerClient.Builder setCredentials(Credentials value) { + public Builder setCredentials(Credentials value) { credentials = value; return this; } diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java index 832a75defd9..99016b5dd3a 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java @@ -61,16 +61,14 @@ public class DataGenerator { .build(); } - public static DataGenerator.Builder builderWithSchemaLocation( - String testName, String schemaLocation) { - return new DataGenerator.Builder(testName + "-data-generator") + public static Builder builderWithSchemaLocation(String testName, String schemaLocation) { + return new Builder(testName + "-data-generator") .setSchemaLocation(schemaLocation) .setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED); } - public static DataGenerator.Builder builderWithSchemaTemplate( - String testName, String schemaTemplate) { - return new DataGenerator.Builder(testName + "-data-generator") + public static Builder builderWithSchemaTemplate(String testName, String schemaTemplate) { + return new Builder(testName + "-data-generator") .setSchemaTemplate(schemaTemplate) .setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED); } @@ -131,27 +129,27 @@ public class DataGenerator { return parameters; } - public DataGenerator.Builder setSchemaTemplate(String value) { + public Builder setSchemaTemplate(String value) { parameters.put("schemaTemplate", value); return this; } - public DataGenerator.Builder setSchemaLocation(String value) { + public Builder setSchemaLocation(String value) { parameters.put("schemaLocation", value); return this; } - public DataGenerator.Builder setMessagesLimit(String value) { + public Builder setMessagesLimit(String value) { parameters.put(MESSAGES_LIMIT, value); return this; } - public DataGenerator.Builder setQPS(String value) { + public Builder setQPS(String value) { parameters.put("qps", value); return this; } - public DataGenerator.Builder setSinkType(String value) { + public Builder setSinkType(String value) { parameters.put("sinkType", value); return this; } @@ -166,87 +164,87 @@ public class DataGenerator { return this; } - public DataGenerator.Builder setMaxNumWorkers(String value) { + public Builder setMaxNumWorkers(String value) { parameters.put("maxNumWorkers", value); return this; } - public DataGenerator.Builder setAutoscalingAlgorithm(AutoscalingAlgorithmType value) { + public Builder setAutoscalingAlgorithm(AutoscalingAlgorithmType value) { parameters.put("autoscalingAlgorithm", value.toString()); return this; } - public DataGenerator.Builder setOutputDirectory(String value) { + public Builder setOutputDirectory(String value) { parameters.put("outputDirectory", value); return this; } - public DataGenerator.Builder setOutputType(String value) { + public Builder setOutputType(String value) { parameters.put("outputType", value); return this; } - public DataGenerator.Builder setNumShards(String value) { + public Builder setNumShards(String value) { parameters.put("numShards", value); return this; } - public DataGenerator.Builder setAvroSchemaLocation(String value) { + public Builder setAvroSchemaLocation(String value) { parameters.put("avroSchemaLocation", value); return this; } - public DataGenerator.Builder setTopic(String value) { + public Builder setTopic(String value) { parameters.put("topic", value); return this; } - public DataGenerator.Builder setProjectId(String value) { + public Builder setProjectId(String value) { parameters.put("projectId", value); return this; } - public DataGenerator.Builder setSpannerInstanceName(String value) { + public Builder setSpannerInstanceName(String value) { parameters.put("spannerInstanceName", value); return this; } - public DataGenerator.Builder setSpannerDatabaseName(String value) { + public Builder setSpannerDatabaseName(String value) { parameters.put("spannerDatabaseName", value); return this; } - public DataGenerator.Builder setSpannerTableName(String value) { + public Builder setSpannerTableName(String value) { parameters.put("spannerTableName", value); return this; } - public DataGenerator.Builder setDriverClassName(String value) { + public Builder setDriverClassName(String value) { parameters.put("driverClassName", value); return this; } - public DataGenerator.Builder setConnectionUrl(String value) { + public Builder setConnectionUrl(String value) { parameters.put("connectionUrl", value); return this; } - public DataGenerator.Builder setUsername(String value) { + public Builder setUsername(String value) { parameters.put("username", value); return this; } - public DataGenerator.Builder setPassword(String value) { + public Builder setPassword(String value) { parameters.put("password", value); return this; } - public DataGenerator.Builder setConnectionProperties(String value) { + public Builder setConnectionProperties(String value) { parameters.put("connectionProperties", value); return this; } - public DataGenerator.Builder setStatement(String value) { + public Builder setStatement(String value) { parameters.put("statement", value); return this; } diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/DatastoreAsserts.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/DatastoreAsserts.java index 78fa7543150..ef67a5a5c4f 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/DatastoreAsserts.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/DatastoreAsserts.java @@ -61,8 +61,7 @@ public class DatastoreAsserts { /** * Creates a {@link RecordsSubject} to assert information within a list of records. * - * @param results Records in Datastore {@link com.google.cloud.datastore.Entity} format to use in - * the comparison. + * @param results Records in Datastore {@link Entity} format to use in the comparison. * @return Truth subject to chain assertions. */ public static RecordsSubject assertThatDatastoreRecords(Collection<Entity> results) { diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dlp/DlpResourceManager.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dlp/DlpResourceManager.java index de818a1bbff..f59794af3e1 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dlp/DlpResourceManager.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dlp/DlpResourceManager.java @@ -113,9 +113,8 @@ public class DlpResourceManager implements ResourceManager { * @param project the GCP project ID * @return a new instance of Builder */ - public static DlpResourceManager.Builder builder( - String project, CredentialsProvider credentialsProvider) { - return new DlpResourceManager.Builder(project, credentialsProvider); + public static Builder builder(String project, CredentialsProvider credentialsProvider) { + return new Builder(project, credentialsProvider); } /** A builder class for creating instances of {@link DlpResourceManager}. */ diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java index 2cad6d0b9fa..7e1a403c735 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java @@ -72,9 +72,8 @@ public class KMSResourceManager implements ResourceManager { this.keyRing = null; } - public static KMSResourceManager.Builder builder( - String projectId, CredentialsProvider credentialsProvider) { - return new KMSResourceManager.Builder(projectId, credentialsProvider); + public static Builder builder(String projectId, CredentialsProvider credentialsProvider) { + return new Builder(projectId, credentialsProvider); } /** diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/monitoring/MonitoringClient.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/monitoring/MonitoringClient.java index 06591ea4fe0..0fc5614a363 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/monitoring/MonitoringClient.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/monitoring/MonitoringClient.java @@ -150,8 +150,8 @@ public final class MonitoringClient { Aggregation aggregation = Aggregation.newBuilder() .setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build()) - .setPerSeriesAligner(Aggregation.Aligner.ALIGN_MEAN) - .setCrossSeriesReducer(Aggregation.Reducer.REDUCE_MEAN) + .setPerSeriesAligner(Aligner.ALIGN_MEAN) + .setCrossSeriesReducer(Reducer.REDUCE_MEAN) .addGroupByFields("resource.instance_id") .build(); ListTimeSeriesRequest request = @@ -188,7 +188,7 @@ public final class MonitoringClient { Aggregation aggregation = Aggregation.newBuilder() .setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build()) - .setPerSeriesAligner(Aggregation.Aligner.ALIGN_MEAN) + .setPerSeriesAligner(Aligner.ALIGN_MEAN) .setCrossSeriesReducer(Reducer.REDUCE_MAX) .build(); ListTimeSeriesRequest request = @@ -225,7 +225,7 @@ public final class MonitoringClient { Aggregation aggregation = Aggregation.newBuilder() .setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build()) - .setPerSeriesAligner(Aggregation.Aligner.ALIGN_MEAN) + .setPerSeriesAligner(Aligner.ALIGN_MEAN) .setCrossSeriesReducer(Reducer.REDUCE_MAX) .build(); ListTimeSeriesRequest request = @@ -269,7 +269,7 @@ public final class MonitoringClient { Aggregation aggregation = Aggregation.newBuilder() .setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build()) - .setPerSeriesAligner(Aggregation.Aligner.ALIGN_RATE) + .setPerSeriesAligner(Aligner.ALIGN_RATE) .build(); ListTimeSeriesRequest request = ListTimeSeriesRequest.newBuilder() @@ -312,7 +312,7 @@ public final class MonitoringClient { Aggregation aggregation = Aggregation.newBuilder() .setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build()) - .setPerSeriesAligner(Aggregation.Aligner.ALIGN_RATE) + .setPerSeriesAligner(Aligner.ALIGN_RATE) .build(); ListTimeSeriesRequest request = ListTimeSeriesRequest.newBuilder() diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java index c9964d16f3b..5a101e08d37 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/matchers/SpannerAsserts.java @@ -17,13 +17,17 @@ */ package org.apache.beam.it.gcp.spanner.matchers; +import static org.apache.beam.it.gcp.artifacts.utils.JsonTestUtil.parseJsonString; import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatRecords; import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.Type; +import com.google.cloud.spanner.Type.Code; import com.google.cloud.spanner.Value; +import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -81,6 +85,47 @@ public class SpannerAsserts { } } + /** + * Convert a list of Spanner {@link Mutation} objects into a list of maps, extracting specified + * columns. + * + * @param mutations The list of mutations to process. + * @param columns The columns to extract. + * @return List of maps to use in {@link RecordsSubject} + */ + public static List<Map<String, Object>> mutationsToRecords( + List<Mutation> mutations, List<String> columns) { + try { + List<Map<String, Object>> records = new ArrayList<>(); + mutations.forEach( + entry -> { + records.add( + entry.asMap().entrySet().stream() + .filter((e) -> columns.contains(e.getKey())) + .collect( + Collectors.toMap( + Map.Entry::getKey, + (e) -> { + if (e.getValue().getType().getCode() == Code.ARRAY) { + return e.getValue().getAsStringList(); + } + if (Arrays.asList(Code.JSON, Code.PG_JSONB) + .contains(e.getValue().getType().getCode())) { + try { + return parseJsonString(e.getValue().getJson()); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + return e.getValue().getAsString(); + }))); + }); + return records; + } catch (Exception e) { + throw new RuntimeException("Error converting TableResult to Records", e); + } + } + /** * Creates a {@link RecordsSubject} to assert information within a list of records. * diff --git a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOLT.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOLT.java index e232ed31cb5..a6516863b8d 100644 --- a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOLT.java +++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOLT.java @@ -59,7 +59,7 @@ import org.junit.Test; * -DfailIfNoTests=false". * * <p>Example trigger command for specific test: "mvn test -pl it/google-cloud-platform -am \ - * -Dtest="BigTableIOLT#testWriteAndRead" -Dconfiguration=local -Dproject=[gcpProject] \ + * -Dtest="BigTableIOLT#testBigtableWriteAndRead" -Dconfiguration=local -Dproject=[gcpProject] \ * -DartifactBucket=[temp bucket] -DfailIfNoTests=false". */ public class BigTableIOLT extends IOLoadTestBase { @@ -67,7 +67,7 @@ public class BigTableIOLT extends IOLoadTestBase { private static final String COLUMN_FAMILY_NAME = "cf"; private static final long TABLE_MAX_AGE_MINUTES = 100L; - private static BigtableResourceManager resourceManager; + private BigtableResourceManager resourceManager; private static final String READ_ELEMENT_METRIC_NAME = "read_count"; private Configuration configuration; private String tableId; @@ -114,7 +114,7 @@ public class BigTableIOLT extends IOLoadTestBase { /** Run integration test with configurations specified by TestProperties. */ @Test - public void testWriteAndRead() throws IOException { + public void testBigtableWriteAndRead() throws IOException { tableId = generateTableId(testName); resourceManager.createTable( @@ -205,7 +205,7 @@ public class BigTableIOLT extends IOLoadTestBase { return pipelineLauncher.launch(project, region, options); } - /** Options for Bigquery IO load test. */ + /** Options for BigtableIO load test. */ @AutoValue abstract static class Configuration { abstract Long getNumRows(); @@ -227,18 +227,18 @@ public class BigTableIOLT extends IOLoadTestBase { @AutoValue.Builder abstract static class Builder { - abstract Configuration.Builder setNumRows(long numRows); + abstract Builder setNumRows(long numRows); - abstract Configuration.Builder setPipelineTimeout(int timeOutMinutes); + abstract Builder setPipelineTimeout(int timeOutMinutes); - abstract Configuration.Builder setRunner(String runner); + abstract Builder setRunner(String runner); - abstract Configuration.Builder setValueSizeBytes(int valueSizeBytes); + abstract Builder setValueSizeBytes(int valueSizeBytes); abstract Configuration build(); } - abstract Configuration.Builder toBuilder(); + abstract Builder toBuilder(); } /** Maps long number to the BigTable format record. */ diff --git a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerIOLT.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerIOLT.java new file mode 100644 index 00000000000..949b863be3d --- /dev/null +++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerIOLT.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.gcp.spanner; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.cloud.ByteArray; +import com.google.cloud.spanner.Mutation; +import java.io.IOException; +import java.io.Serializable; +import java.text.ParseException; +import java.time.Duration; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.UUID; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.TestProperties; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.IOLoadTestBase; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +/** + * SpannerIO performance tests. + * + * <p>Example trigger command for all tests: "mvn test -pl it/google-cloud-platform -am + * -Dtest=SpannerIOLT \ -Dproject=[gcpProject] -DartifactBucket=[temp bucket] + * -DfailIfNoTests=false". + * + * <p>Example trigger command for specific test: "mvn test -pl it/google-cloud-platform -am \ + * -Dtest="SpannerIOLT#testSpannerWriteAndRead" -Dconfiguration=local -Dproject=[gcpProject] \ + * -DartifactBucket=[temp bucket] -DfailIfNoTests=false". + */ +public class SpannerIOLT extends IOLoadTestBase { + @Rule public TestPipeline writePipeline = TestPipeline.create(); + @Rule public TestPipeline readPipeline = TestPipeline.create(); + private String tableName; + private SpannerResourceManager resourceManager; + private Configuration configuration; + private static final String READ_ELEMENT_METRIC_NAME = "read_count"; + + @Before + public void setup() throws IOException { + // generate a random table name + tableName = + "io_spanner_" + + DateTimeFormatter.ofPattern("MMddHHmmssSSS") + .withZone(ZoneId.of("UTC")) + .format(java.time.Instant.now()) + + UUID.randomUUID().toString().replace("-", "").substring(0, 10); + + resourceManager = SpannerResourceManager.builder(testName, project, region).build(); + + // parse configuration + String testConfig = + TestProperties.getProperty("configuration", "local", TestProperties.Type.PROPERTY); + configuration = TEST_CONFIGS_PRESET.get(testConfig); + if (configuration == null) { + try { + configuration = Configuration.fromJsonString(testConfig, Configuration.class); + } catch (IOException e) { + throw new IllegalArgumentException( + String.format( + "Unknown test configuration: [%s]. Pass to a valid configuration json, or use" + + " config presets: %s", + testConfig, TEST_CONFIGS_PRESET.keySet())); + } + } + // prepare schema + String createTable = + createTableStatement( + tableName, configuration.numColumns, (int) configuration.valueSizeBytes); + // Create table + resourceManager.executeDdlStatement(createTable); + } + + @After + public void teardown() { + ResourceManagerUtils.cleanResources(resourceManager); + } + + private static final Map<String, Configuration> TEST_CONFIGS_PRESET; + + static { + try { + TEST_CONFIGS_PRESET = + ImmutableMap.of( + "local", + Configuration.fromJsonString( + "{\"numRecords\":1000,\"valueSizeBytes\":1000,\"pipelineTimeout\":2,\"runner\":\"DirectRunner\"}", + Configuration.class), // 1 MB + "medium", + Configuration.fromJsonString( + "{\"numRecords\":10000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":20,\"runner\":\"DataflowRunner\"}", + Configuration.class), // 10 GB + "large", + Configuration.fromJsonString( + "{\"numRecords\":100000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":80,\"runner\":\"DataflowRunner\"}", + Configuration.class) // 100 GB + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testSpannerWriteAndRead() throws IOException { + PipelineLauncher.LaunchInfo writeInfo = testWrite(); + PipelineOperator.Result writeResult = + pipelineOperator.waitUntilDone( + createConfig(writeInfo, Duration.ofMinutes(configuration.pipelineTimeout))); + assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, writeResult); + + PipelineLauncher.LaunchInfo readInfo = testRead(); + PipelineOperator.Result result = + pipelineOperator.waitUntilDone( + createConfig(readInfo, Duration.ofMinutes(configuration.pipelineTimeout))); + assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, result); + assertEquals( + PipelineLauncher.JobState.DONE, + pipelineLauncher.getJobStatus(project, region, readInfo.jobId())); + double numRecords = + pipelineLauncher.getMetric( + project, + region, + readInfo.jobId(), + getBeamMetricsName(PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME)); + assertEquals(configuration.numRecords, numRecords, 0.5); + + // export metrics + MetricsConfiguration metricsConfig = + MetricsConfiguration.builder() + .setInputPCollection("Map records.out0") + .setInputPCollectionV2("Map records/ParMultiDo(GenerateMutations).out0") + .setOutputPCollection("Counting element.out0") + .setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0") + .build(); + try { + exportMetricsToBigQuery(writeInfo, getMetrics(writeInfo, metricsConfig)); + exportMetricsToBigQuery(readInfo, getMetrics(readInfo, metricsConfig)); + } catch (ParseException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private PipelineLauncher.LaunchInfo testWrite() throws IOException { + SpannerIO.Write writeTransform = + SpannerIO.write() + .withProjectId(project) + .withInstanceId(resourceManager.getInstanceId()) + .withDatabaseId(resourceManager.getDatabaseId()); + + writePipeline + .apply(GenerateSequence.from(0).to(configuration.numRecords)) + .apply( + "Map records", + ParDo.of( + new GenerateMutations( + tableName, configuration.numColumns, (int) configuration.valueSizeBytes))) + .apply("Write to Spanner", writeTransform); + + PipelineLauncher.LaunchConfig options = + PipelineLauncher.LaunchConfig.builder("write-spanner") + .setSdk(PipelineLauncher.Sdk.JAVA) + .setPipeline(writePipeline) + .addParameter("runner", configuration.runner) + .build(); + + return pipelineLauncher.launch(project, region, options); + } + + private PipelineLauncher.LaunchInfo testRead() throws IOException { + SpannerIO.Read readTrabsfirn = + SpannerIO.read() + .withProjectId(project) + .withInstanceId(resourceManager.getInstanceId()) + .withDatabaseId(resourceManager.getDatabaseId()) + .withQuery(String.format("SELECT * FROM %s", tableName)); + + readPipeline + .apply("Read from Spanner", readTrabsfirn) + .apply("Counting element", ParDo.of(new CountingFn<>(READ_ELEMENT_METRIC_NAME))); + + PipelineLauncher.LaunchConfig options = + PipelineLauncher.LaunchConfig.builder("read-spanner") + .setSdk(PipelineLauncher.Sdk.JAVA) + .setPipeline(readPipeline) + .addParameter("runner", configuration.runner) + .build(); + + return pipelineLauncher.launch(project, region, options); + } + + /** Options for SpannerIO load test. */ + static class Configuration extends SyntheticSourceOptions { + + /** + * Number of columns (besides the primary key) of each record. The column size is equally + * distributed as valueSizeBytes/numColumns. + */ + @JsonProperty public int numColumns = 1; + + /** Pipeline timeout in minutes. Must be a positive value. */ + @JsonProperty public int pipelineTimeout = 20; + + /** Runner specified to run the pipeline. */ + @JsonProperty public String runner = "DirectRunner"; + } + + /** + * Generate a create table sql statement with 1 integer column (Id) and additional numBytesCol + * columns. + */ + static String createTableStatement(String tableId, int numBytesCol, int valueSizeBytes) { + int sizePerCol = valueSizeBytes / numBytesCol; + StringBuilder statement = new StringBuilder(); + statement.append(String.format("CREATE TABLE %s (Id INT64", tableId)); + for (int col = 0; col < numBytesCol; ++col) { + statement.append(String.format(",\n COL%d BYTES(%d)", col + 1, sizePerCol)); + } + statement.append(") PRIMARY KEY(Id)"); + return statement.toString(); + } + + /** Maps long number to the Spanner format record. */ + private static class GenerateMutations extends DoFn<Long, Mutation> implements Serializable { + private final String table; + private final int numBytesCol; + private final int sizePerCol; + + public GenerateMutations(String tableId, int numBytesCol, int valueSizeBytes) { + checkArgument(valueSizeBytes >= numBytesCol); + this.table = tableId; + this.numBytesCol = numBytesCol; + this.sizePerCol = valueSizeBytes / numBytesCol; + } + + @ProcessElement + public void processElement(ProcessContext c) { + Mutation.WriteBuilder builder = Mutation.newInsertOrUpdateBuilder(table); + Long key = Objects.requireNonNull(c.element()); + builder.set("Id").to(key); + Random random = new Random(key); + byte[] value = new byte[sizePerCol]; + for (int col = 0; col < numBytesCol; ++col) { + String name = String.format("COL%d", col + 1); + random.nextBytes(value); + builder.set(name).to(ByteArray.copyFrom(value)); + } + Mutation mutation = builder.build(); + c.output(mutation); + } + } +} diff --git a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java index 704f8337c66..a36f3b340e8 100644 --- a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java +++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java @@ -90,7 +90,7 @@ public class FileBasedIOLT extends IOLoadTestBase { @Rule public TestPipeline readPipeline = TestPipeline.create(); - private static final Map<String, FileBasedIOLT.Configuration> TEST_CONFIGS_PRESET; + private static final Map<String, Configuration> TEST_CONFIGS_PRESET; static { try { diff --git a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/AbstractJDBCResourceManager.java b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/AbstractJDBCResourceManager.java index b57185b70eb..6d50dddb0cc 100644 --- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/AbstractJDBCResourceManager.java +++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/AbstractJDBCResourceManager.java @@ -196,7 +196,8 @@ public abstract class AbstractJDBCResourceManager<T extends JdbcDatabaseContaine valueList.add(null); } else if (NumberUtils.isCreatable(value.toString()) || "true".equalsIgnoreCase(value.toString()) - || "false".equalsIgnoreCase(value.toString())) { + || "false".equalsIgnoreCase(value.toString()) + || value.toString().startsWith("ARRAY[")) { valueList.add(String.valueOf(value)); } else { valueList.add("'" + value + "'"); @@ -226,34 +227,9 @@ public abstract class AbstractJDBCResourceManager<T extends JdbcDatabaseContaine @SuppressWarnings("nullness") public List<Map<String, Object>> readTable(String tableName) { LOG.info("Reading all rows from {}.{}", databaseName, tableName); - - List<Map<String, Object>> resultSet = new ArrayList<>(); - - StringBuilder sql = new StringBuilder(); - try (Connection con = driver.getConnection(getUri(), username, password)) { - Statement stmt = con.createStatement(); - - sql.append("SELECT * FROM ").append(tableName); - ResultSet result = stmt.executeQuery(sql.toString()); - - while (result.next()) { - Map<String, Object> row = new HashMap<>(); - ResultSetMetaData metadata = result.getMetaData(); - // Columns list in table metadata is 1-indexed - for (int i = 1; i <= metadata.getColumnCount(); i++) { - row.put(metadata.getColumnName(i), result.getObject(i)); - } - resultSet.add(row); - } - result.close(); - stmt.close(); - } catch (Exception e) { - throw new JDBCResourceManagerException( - "Failed to fetch rows from table. SQL statement: " + sql, e); - } - + List<Map<String, Object>> result = runSQLQuery(String.format("SELECT * FROM %s", tableName)); LOG.info("Successfully loaded rows from {}.{}", databaseName, tableName); - return resultSet; + return result; } @Override @@ -290,9 +266,21 @@ public abstract class AbstractJDBCResourceManager<T extends JdbcDatabaseContaine } @Override - public synchronized ResultSet runSQLQuery(String sql) { + @SuppressWarnings("nullness") + public synchronized List<Map<String, Object>> runSQLQuery(String sql) { try (Statement stmt = driver.getConnection(getUri(), username, password).createStatement()) { - return stmt.executeQuery(sql); + List<Map<String, Object>> result = new ArrayList<>(); + ResultSet resultSet = stmt.executeQuery(sql); + while (resultSet.next()) { + Map<String, Object> row = new HashMap<>(); + ResultSetMetaData metadata = resultSet.getMetaData(); + // Columns list in table metadata is 1-indexed + for (int i = 1; i <= metadata.getColumnCount(); i++) { + row.put(metadata.getColumnName(i), resultSet.getObject(i)); + } + result.add(row); + } + return result; } catch (Exception e) { throw new JDBCResourceManagerException("Failed to execute SQL statement: " + sql, e); } @@ -307,6 +295,21 @@ public abstract class AbstractJDBCResourceManager<T extends JdbcDatabaseContaine } } + @Override + public synchronized long getRowCount(String tableName) { + try (Connection con = driver.getConnection(getUri(), username, password)) { + Statement stmt = con.createStatement(); + ResultSet resultSet = stmt.executeQuery(String.format("SELECT count(*) FROM %s", tableName)); + resultSet.next(); + long rows = resultSet.getLong(1); + resultSet.close(); + stmt.close(); + return rows; + } catch (Exception e) { + throw new JDBCResourceManagerException("Failed to get row count from " + tableName, e); + } + } + /** * Builder for {@link AbstractJDBCResourceManager}. * diff --git a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/JDBCResourceManager.java b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/JDBCResourceManager.java index 9292d4cb42e..deb29ff3a5e 100644 --- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/JDBCResourceManager.java +++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/JDBCResourceManager.java @@ -19,7 +19,6 @@ package org.apache.beam.it.jdbc; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import java.sql.ResultSet; import java.util.List; import java.util.Map; import org.apache.beam.it.common.ResourceManager; @@ -102,7 +101,7 @@ public interface JDBCResourceManager extends ResourceManager { * @param sql The SQL query to run. * @return A ResultSet containing the result of the execution. */ - ResultSet runSQLQuery(String sql); + List<Map<String, Object>> runSQLQuery(String sql); /** * Run the given SQL DML statement (INSERT, UPDATE and DELETE). @@ -111,6 +110,14 @@ public interface JDBCResourceManager extends ResourceManager { */ void runSQLUpdate(String sql); + /** + * Gets the number of rows in table. + * + * @param tableName The name of the table. + * @return a count of number of rows in the table. + */ + long getRowCount(String tableName); + /** Object for managing JDBC table schemas in {@link JDBCResourceManager} instances. */ class JDBCSchema { diff --git a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MSSQLResourceManager.java b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MSSQLResourceManager.java index c515b2c4844..0bcb16c6109 100644 --- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MSSQLResourceManager.java +++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MSSQLResourceManager.java @@ -61,14 +61,13 @@ public class MSSQLResourceManager } @VisibleForTesting - <T extends MSSQLResourceManager.DefaultMSSQLServerContainer<T>> MSSQLResourceManager( - T container, Builder builder) { + <T extends DefaultMSSQLServerContainer<T>> MSSQLResourceManager(T container, Builder builder) { super(container, builder); initialized = true; } - public static MSSQLResourceManager.Builder builder(String testId) { - return new MSSQLResourceManager.Builder(testId); + public static Builder builder(String testId) { + return new Builder(testId); } private synchronized void createDatabase(String databaseName) { diff --git a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MySQLResourceManager.java b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MySQLResourceManager.java index 688c26dfb56..e1bf3640b53 100644 --- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MySQLResourceManager.java +++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MySQLResourceManager.java @@ -49,8 +49,8 @@ public class MySQLResourceManager extends AbstractJDBCResourceManager<MySQLConta super(container, builder); } - public static MySQLResourceManager.Builder builder(String testId) { - return new MySQLResourceManager.Builder(testId); + public static Builder builder(String testId) { + return new Builder(testId); } @Override diff --git a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/OracleResourceManager.java b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/OracleResourceManager.java index 8054d26c33f..f44e939936d 100644 --- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/OracleResourceManager.java +++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/OracleResourceManager.java @@ -45,7 +45,7 @@ public class OracleResourceManager extends AbstractJDBCResourceManager<OracleCon private static final String DEFAULT_ORACLE_USERNAME = "testUser"; private static final String DEFAULT_ORACLE_PASSWORD = "testPassword"; - private OracleResourceManager(OracleResourceManager.Builder builder) { + private OracleResourceManager(Builder builder) { this( new OracleContainer( DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)), @@ -53,12 +53,12 @@ public class OracleResourceManager extends AbstractJDBCResourceManager<OracleCon } @VisibleForTesting - OracleResourceManager(OracleContainer container, OracleResourceManager.Builder builder) { + OracleResourceManager(OracleContainer container, Builder builder) { super(container, builder); } - public static OracleResourceManager.Builder builder(String testId) { - return new OracleResourceManager.Builder(testId); + public static Builder builder(String testId) { + return new Builder(testId); } @Override diff --git a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/PostgresResourceManager.java b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/PostgresResourceManager.java index 7f054dfbc5d..26bdff2305b 100644 --- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/PostgresResourceManager.java +++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/PostgresResourceManager.java @@ -38,21 +38,13 @@ public class PostgresResourceManager extends AbstractJDBCResourceManager<Postgre // https://hub.docker.com/_/postgres/tags?tab=tags private static final String DEFAULT_POSTGRES_CONTAINER_TAG = "15.1"; - private PostgresResourceManager(PostgresResourceManager.Builder builder) { - this( - new PostgreSQLContainer<>( - DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)), - builder); - } - @VisibleForTesting - PostgresResourceManager( - PostgreSQLContainer<?> container, PostgresResourceManager.Builder builder) { + PostgresResourceManager(PostgreSQLContainer<?> container, Builder builder) { super(container, builder); } - public static PostgresResourceManager.Builder builder(String testId) { - return new PostgresResourceManager.Builder(testId); + public static Builder builder(String testId) { + return new Builder(testId); } @Override @@ -80,7 +72,11 @@ public class PostgresResourceManager extends AbstractJDBCResourceManager<Postgre @Override public PostgresResourceManager build() { - return new PostgresResourceManager(this); + PostgreSQLContainer<?> container = + new PostgreSQLContainer<>( + DockerImageName.parse(containerImageName).withTag(containerImageTag)); + container.setCommand("postgres", "-c", "fsync=off", "-c", "max_connections=1000"); + return new PostgresResourceManager(container, this); } } } diff --git a/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java b/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java index 7f7fb5b6956..d9a647dbeeb 100644 --- a/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java +++ b/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java @@ -71,16 +71,13 @@ public class KafkaResourceManager extends TestContainerResourceManager<GenericCo private final String connectionString; private final boolean usingStaticTopic; - private KafkaResourceManager(KafkaResourceManager.Builder builder) { + private KafkaResourceManager(Builder builder) { this(null, new DefaultKafkaContainer(builder), builder); } @VisibleForTesting @SuppressWarnings("nullness") - KafkaResourceManager( - @Nullable AdminClient client, - KafkaContainer container, - KafkaResourceManager.Builder builder) { + KafkaResourceManager(@Nullable AdminClient client, KafkaContainer container, Builder builder) { super(container, builder); this.usingStaticTopic = builder.topicNames.size() > 0; @@ -105,8 +102,8 @@ public class KafkaResourceManager extends TestContainerResourceManager<GenericCo : AdminClient.create(ImmutableMap.of("bootstrap.servers", this.connectionString)); } - public static KafkaResourceManager.Builder builder(String testId) { - return new KafkaResourceManager.Builder(testId); + public static Builder builder(String testId) { + return new Builder(testId); } /** Returns the kafka bootstrap server connection string. */ diff --git a/it/mongodb/src/main/java/org/apache/beam/it/mongodb/MongoDBResourceManager.java b/it/mongodb/src/main/java/org/apache/beam/it/mongodb/MongoDBResourceManager.java index 80216b14ac0..ed0e556bf0d 100644 --- a/it/mongodb/src/main/java/org/apache/beam/it/mongodb/MongoDBResourceManager.java +++ b/it/mongodb/src/main/java/org/apache/beam/it/mongodb/MongoDBResourceManager.java @@ -69,7 +69,7 @@ public class MongoDBResourceManager extends TestContainerResourceManager<MongoDB private final String connectionString; private final boolean usingStaticDatabase; - private MongoDBResourceManager(MongoDBResourceManager.Builder builder) { + private MongoDBResourceManager(Builder builder) { this( /* mongoClient= */ null, new MongoDBContainer( @@ -80,9 +80,7 @@ public class MongoDBResourceManager extends TestContainerResourceManager<MongoDB @VisibleForTesting @SuppressWarnings("nullness") MongoDBResourceManager( - @Nullable MongoClient mongoClient, - MongoDBContainer container, - MongoDBResourceManager.Builder builder) { + @Nullable MongoClient mongoClient, MongoDBContainer container, Builder builder) { super(container, builder); this.usingStaticDatabase = builder.databaseName != null; @@ -93,8 +91,8 @@ public class MongoDBResourceManager extends TestContainerResourceManager<MongoDB this.mongoClient = mongoClient == null ? MongoClients.create(connectionString) : mongoClient; } - public static MongoDBResourceManager.Builder builder(String testId) { - return new MongoDBResourceManager.Builder(testId); + public static Builder builder(String testId) { + return new Builder(testId); } /** Returns the URI connection string to the MongoDB Database. */ diff --git a/it/mongodb/src/main/java/org/apache/beam/it/mongodb/matchers/MongoDBAsserts.java b/it/mongodb/src/main/java/org/apache/beam/it/mongodb/matchers/MongoDBAsserts.java index 1a1b86acf56..ec08854b5c6 100644 --- a/it/mongodb/src/main/java/org/apache/beam/it/mongodb/matchers/MongoDBAsserts.java +++ b/it/mongodb/src/main/java/org/apache/beam/it/mongodb/matchers/MongoDBAsserts.java @@ -31,7 +31,7 @@ import org.bson.Document; public class MongoDBAsserts { /** - * Convert MongoDB {@link org.bson.Document} to a list of maps. + * Convert MongoDB {@link Document} to a list of maps. * * @param documents List of Documents to parse * @return List of maps to use in {@link RecordsSubject} diff --git a/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java b/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java index 97bcca9e84b..835be71ce0f 100644 --- a/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java +++ b/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java @@ -66,7 +66,7 @@ public class Neo4jResourceManager extends TestContainerResourceManager<Neo4jCont private final String adminPassword; - private Neo4jResourceManager(Neo4jResourceManager.Builder builder) { + private Neo4jResourceManager(Builder builder) { this( builder.driver, new Neo4jContainer<>( @@ -79,10 +79,7 @@ public class Neo4jResourceManager extends TestContainerResourceManager<Neo4jCont @VisibleForTesting @SuppressWarnings("nullness") - Neo4jResourceManager( - @Nullable Driver neo4jDriver, - Neo4jContainer<?> container, - Neo4jResourceManager.Builder builder) { + Neo4jResourceManager(@Nullable Driver neo4jDriver, Neo4jContainer<?> container, Builder builder) { super(container, builder); this.adminPassword = builder.adminPassword; @@ -101,8 +98,8 @@ public class Neo4jResourceManager extends TestContainerResourceManager<Neo4jCont } } - public static Neo4jResourceManager.Builder builder(String testId) { - return new Neo4jResourceManager.Builder(testId); + public static Builder builder(String testId) { + return new Builder(testId); } /** Returns the URI connection string to the Neo4j Database. */ diff --git a/it/neo4j/src/main/java/org/apache/beam/it/neo4j/conditions/Neo4jQueryCheck.java b/it/neo4j/src/main/java/org/apache/beam/it/neo4j/conditions/Neo4jQueryCheck.java index 32e283edb72..16f25133842 100644 --- a/it/neo4j/src/main/java/org/apache/beam/it/neo4j/conditions/Neo4jQueryCheck.java +++ b/it/neo4j/src/main/java/org/apache/beam/it/neo4j/conditions/Neo4jQueryCheck.java @@ -20,6 +20,9 @@ package org.apache.beam.it.neo4j.conditions; import com.google.auto.value.AutoValue; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.beam.it.conditions.ConditionCheck; import org.apache.beam.it.neo4j.Neo4jResourceManager; @@ -55,9 +58,17 @@ public abstract class Neo4jQueryCheck extends ConditionCheck { if (actualResult == null) { return new CheckResult(expectedResult == null); } + + Set<Map<String, Object>> sortedActualResult = sort(actualResult); + Set<Map<String, Object>> sortedExpectedResult = sort(expectedResult); + return new CheckResult( - actualResult.equals(expectedResult), - String.format("Expected %s to equal %s", actualResult, expectedResult)); + sortedActualResult.equals(sortedExpectedResult), + String.format("Expected %s to equal %s", sortedActualResult, sortedExpectedResult)); + } + + private static Set<Map<String, Object>> sort(List<Map<String, Object>> list) { + return list.stream().map(TreeMap::new).collect(Collectors.toSet()); } public static Builder builder(Neo4jResourceManager resourceManager) { diff --git a/it/splunk/src/main/java/org/apache/beam/it/splunk/SplunkResourceManager.java b/it/splunk/src/main/java/org/apache/beam/it/splunk/SplunkResourceManager.java index 1ef4726df43..0115a791eef 100644 --- a/it/splunk/src/main/java/org/apache/beam/it/splunk/SplunkResourceManager.java +++ b/it/splunk/src/main/java/org/apache/beam/it/splunk/SplunkResourceManager.java @@ -85,7 +85,7 @@ public class SplunkResourceManager extends TestContainerResourceManager<SplunkCo private final SplunkClientFactory clientFactory; @SuppressWarnings("resource") - private SplunkResourceManager(SplunkResourceManager.Builder builder) { + private SplunkResourceManager(Builder builder) { this( new SplunkClientFactory(), new SplunkContainer( @@ -98,9 +98,7 @@ public class SplunkResourceManager extends TestContainerResourceManager<SplunkCo @VisibleForTesting @SuppressWarnings("nullness") SplunkResourceManager( - SplunkClientFactory clientFactory, - SplunkContainer container, - SplunkResourceManager.Builder builder) { + SplunkClientFactory clientFactory, SplunkContainer container, Builder builder) { super(setup(container, builder), builder); String username = DEFAULT_SPLUNK_USERNAME; @@ -169,8 +167,8 @@ public class SplunkResourceManager extends TestContainerResourceManager<SplunkCo .withPassword(builder.password); } - public static SplunkResourceManager.Builder builder(String testId) { - return new SplunkResourceManager.Builder(testId); + public static Builder builder(String testId) { + return new Builder(testId); } /** diff --git a/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java b/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java index 77dd6da5805..098938a291d 100644 --- a/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java +++ b/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java @@ -32,11 +32,11 @@ import org.testcontainers.containers.GenericContainer; * resources. * * <p>Optionally, a static resource can be specified by calling the useStaticContainer() method in - * the {@link TestContainerResourceManager.Builder} class. A static resource is a pre-configured - * database or other resource that is ready to be connected to by the resource manager. This could - * be a pre-existing TestContainer that has not been closed, a local database instance, a remote VM, - * or any other source that can be connected to. If a static container is used, the host and port - * must also be configured using the Builder's setHost() and setPort() methods, respectively. + * the {@link Builder} class. A static resource is a pre-configured database or other resource that + * is ready to be connected to by the resource manager. This could be a pre-existing TestContainer + * that has not been closed, a local database instance, a remote VM, or any other source that can be + * connected to. If a static container is used, the host and port must also be configured using the + * Builder's setHost() and setPort() methods, respectively. */ public abstract class TestContainerResourceManager<T extends GenericContainer<?>> implements ResourceManager { @@ -48,12 +48,11 @@ public abstract class TestContainerResourceManager<T extends GenericContainer<?> private final String host; protected int port; - protected <B extends TestContainerResourceManager.Builder<?>> TestContainerResourceManager( - T container, B builder) { + protected <B extends Builder<?>> TestContainerResourceManager(T container, B builder) { this(container, builder, null); } - protected <B extends TestContainerResourceManager.Builder<?>> TestContainerResourceManager( + protected <B extends Builder<?>> TestContainerResourceManager( T container, B builder, @Nullable Callable<Void> setup) { this.container = container; this.usingStaticContainer = builder.useStaticContainer; diff --git a/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/RecordsSubject.java b/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/RecordsSubject.java index 39a0c0cebed..75d5ce3a67c 100644 --- a/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/RecordsSubject.java +++ b/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/RecordsSubject.java @@ -81,7 +81,7 @@ public class RecordsSubject extends Subject { Map<String, Object> expected = convertMapToTreeMap(subset); for (Map<String, Object> candidate : actual) { boolean match = true; - for (Map.Entry<String, Object> entry : subset.entrySet()) { + for (Entry<String, Object> entry : subset.entrySet()) { if (!candidate.containsKey(entry.getKey()) || !candidate.get(entry.getKey()).equals(entry.getValue())) { match = false;