johnjcasey commented on code in PR #26444: URL: https://github.com/apache/beam/pull/26444#discussion_r1259800548
########## it/common/build.gradle: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature( + exportJavadoc: false, + automaticModuleName: 'org.apache.beam.it.common', + validateShadowJar: false, + shadowClosure: {}, +) + +description = "Apache Beam :: IT :: Common" +ext.summary = "Code used by all integration test utilities." + +dependencies { + implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.google_api_services_dataflow + implementation library.java.google_auth_library_credentials + implementation library.java.google_auth_library_oauth2_http + implementation library.java.vendored_guava_26_0_jre + implementation library.java.slf4j_api + implementation library.java.commons_lang3 + implementation 'dev.failsafe:failsafe:3.3.0' Review Comment: please do this as a library in the plugin ########## it/testcontainers/build.gradle: ########## @@ -0,0 +1,37 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* License); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an AS IS BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +plugins { id 'org.apache.beam.module' } +applyJavaNature( + automaticModuleName: 'org.apache.beam.it.testcontainers', + enableSpotbugs: false, + validateShadowJar: false, + shadowClosure: {}, +) + +description = "Apache Beam :: IT :: Testcontainers" +ext.summary = "Integration test utilities for Testcontainers." + +dependencies { + implementation project(path: ":it:common", configuration: "shadow") + implementation library.java.testcontainers_base + + testImplementation 'com.google.truth:truth:1.0.1' Review Comment: This should be done as a library, similar to our other libraries ########## it/common/src/main/java/org/apache/beam/it/common/TestProperties.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.common; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auth.Credentials; +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.ComputeEngineCredentials; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ServiceAccountCredentials; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility for accessing system properties set for the test. + * + * <p>There are two types of properties: those set on the command lines and those set as environment + * variables. Those set on the command line always follow a camelCase naming convention, and those + * set as environment variable always follow a CAPITALIZED_SNAKE_CASE naming convention. + */ +public final class TestProperties { + private TestProperties() {} + + // For testability, it is normally best to expect each property from the command line. We should + // only expect an environment variable if we're trying to avoid an accidental log of the + // value. + + // From command line + public static final String ARTIFACT_BUCKET_KEY = "artifactBucket"; + public static final String PROJECT_KEY = "project"; + public static final String REGION_KEY = "region"; + public static final String STAGE_BUCKET = "stageBucket"; + public static final String EXPORT_DATASET_KEY = "exportDataset"; + public static final String EXPORT_PROJECT = "exportProject"; + public static final String EXPORT_TABLE_KEY = "exportTable"; + public static final String SPEC_PATH_KEY = "specPath"; + public static final String HOST_IP = "hostIp"; + // From environment variables + public static final String ACCESS_TOKEN_KEY = "DT_IT_ACCESS_TOKEN"; + + // Default values for optional properties + public static final String DEFAULT_REGION = "us-central1"; + + // Error messages + private static final String CLI_ERR_MSG = "-D%s is required on the command line"; + private static final String ENV_VAR_MSG = "%s is required as an environment variable"; + + private static final Logger LOG = LoggerFactory.getLogger(TestProperties.class); + + public static boolean hasAccessToken() { + return getProperty(ACCESS_TOKEN_KEY, null, Type.ENVIRONMENT_VARIABLE) != null; + } + + public static String accessToken() { + return getProperty(ACCESS_TOKEN_KEY, Type.ENVIRONMENT_VARIABLE, true); + } + + /** + * Create and return credentials based on whether access token was provided or not. + * + * <p>If access token was provided, use the token for Bearer authentication. + * + * <p>If not, use Application Default Credentials. Check + * https://cloud.google.com/docs/authentication/application-default-credentials for more + * information. + * + * @return Credentials. + */ + public static Credentials credentials() { + if (hasAccessToken()) { + return googleCredentials(); + } else { + return buildCredentialsFromEnv(); + } + } + + public static Credentials googleCredentials() { + Credentials credentials; + try { + if (hasAccessToken()) { + credentials = + new GoogleCredentials(new AccessToken(accessToken(), /* expirationTime= */ null)); + } else { + credentials = GoogleCredentials.getApplicationDefault(); + } + } catch (IOException e) { + throw new RuntimeException( + "Unable to get credentials! \n" + + "Please run the following command to set 60 minute access token, \n" + + "\t export DT_IT_ACCESS_TOKEN=$(gcloud auth application-default print-access-token) \n" + + "Please run the following command to set credentials using the gcloud command, " + + "\t gcloud auth application-default login"); + } + return credentials; + } + + public static boolean hasArtifactBucket() { + return getProperty(ARTIFACT_BUCKET_KEY, null, Type.PROPERTY) != null; + } + + public static String artifactBucket() { + return bucketNameOnly(getProperty(ARTIFACT_BUCKET_KEY, Type.PROPERTY, true)); + } + + public static String exportDataset() { + return getProperty(EXPORT_DATASET_KEY, Type.PROPERTY, false); + } + + public static String exportProject() { + return getProperty(EXPORT_PROJECT, Type.PROPERTY, false); + } + + public static String exportTable() { + return getProperty(EXPORT_TABLE_KEY, Type.PROPERTY, false); + } + + public static String project() { + return getProperty(PROJECT_KEY, Type.PROPERTY, true); + } + + public static String region() { + return getProperty(REGION_KEY, DEFAULT_REGION, Type.PROPERTY); + } + + public static String specPath() { + return getProperty(SPEC_PATH_KEY, Type.PROPERTY, false); + } + + public static boolean hasStageBucket() { + return getProperty(STAGE_BUCKET, null, Type.PROPERTY) != null; + } + + public static String stageBucket() { + return bucketNameOnly(getProperty(STAGE_BUCKET, Type.PROPERTY, false)); + } + + public static String hostIp() { + return getProperty(HOST_IP, "localhost", Type.PROPERTY); + } + + /** Gets a property or throws an exception if it is not found. */ + private static String getProperty(String name, Type type, boolean required) { + String value = getProperty(name, null, type); + + if (required) { + String errMsg = + type == Type.PROPERTY + ? String.format(CLI_ERR_MSG, name) + : String.format(ENV_VAR_MSG, name); + checkState(value != null, errMsg); + } + + return value; + } + + /** Gets a property or returns {@code defaultValue} if it is not found. */ + public static String getProperty(String name, @Nullable String defaultValue, Type type) { + String value = type == Type.PROPERTY ? System.getProperty(name) : System.getenv(name); + return value != null ? value : defaultValue; + } + + /** Defines the types of properties there may be. */ + public enum Type { + PROPERTY, + ENVIRONMENT_VARIABLE + } + + /** + * Infers the {@link Credentials} to use with Google services from the current environment + * settings. + * + * <p>First, checks if {@link ServiceAccountCredentials#getApplicationDefault()} returns Compute + * Engine credentials, which means that it is running from a GCE instance and can use the Service + * Account configured for that VM. Will use that + * + * <p>Secondly, it will try to get the environment variable + * <strong>GOOGLE_APPLICATION_CREDENTIALS</strong>, and use that Service Account if configured to + * doing so. The method {@link #getCredentialsStream()} will make sure to search for the specific + * file using both the file system and classpath. + * + * <p>If <strong>GOOGLE_APPLICATION_CREDENTIALS</strong> is not configured, it will return the + * application default, which is often setup through <strong>gcloud auth application-default + * login</strong>. + */ + public static Credentials buildCredentialsFromEnv() { + try { + + // if on Compute Engine, return default credentials. + GoogleCredentials applicationDefault = ServiceAccountCredentials.getApplicationDefault(); + try { + if (applicationDefault instanceof ComputeEngineCredentials) { Review Comment: this looks quite strange, what throws an exception here? ########## it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManager.java: ########## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.cassandra; + +import static org.apache.beam.it.cassandra.CassandraResourceManagerUtils.generateKeyspaceName; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.DriverTimeoutException; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import org.apache.beam.it.common.ResourceManager; +import org.apache.beam.it.testcontainers.TestContainerResourceManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.CassandraContainer; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +/** + * Client for managing Cassandra resources. + * + * <p>The class supports one database and multiple collections per database object. A database is + * created when the first collection is created if one has not been created already. + * + * <p>The database name is formed using testId. The database name will be "{testId}-{ISO8601 time, + * microsecond precision}", with additional formatting. + * + * <p>The class is thread-safe. + */ +public class CassandraResourceManager extends TestContainerResourceManager<GenericContainer<?>> + implements ResourceManager { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraResourceManager.class); + + private static final String DEFAULT_CASSANDRA_CONTAINER_NAME = "cassandra"; + + // A list of available Cassandra Docker image tags can be found at + // https://hub.docker.com/_/cassandra/tags + private static final String DEFAULT_CASSANDRA_CONTAINER_TAG = "4.1.0"; + + // 27017 is the default port that Cassandra is configured to listen on + private static final int CASSANDRA_INTERNAL_PORT = 9042; Review Comment: the comment and the variable don't allign ########## it/common/src/main/java/org/apache/beam/it/common/utils/ResourceManagerUtils.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.common.utils; + +import static java.lang.Math.min; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing.goodFastHash; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.Random; +import java.util.regex.Pattern; +import javax.annotation.Nullable; +import org.apache.beam.it.common.ResourceManager; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.HashFunction; +import org.apache.commons.lang3.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Common utilities for ResourceManager implementations. */ +public class ResourceManagerUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerUtils.class); + + private static final int MIN_PROJECT_ID_LENGTH = 4; + private static final int MAX_PROJECT_ID_LENGTH = 30; + private static final Pattern ILLEGAL_PROJECT_CHARS = Pattern.compile("[^a-zA-Z0-9-!:\\.']"); + private static final String TIME_ZONE = "UTC"; + + /** + * Generates a new id string from an existing one. + * + * @param id The id string to generate a new id from. + * @param targetLength The length of the new id to generate. Must be greater than 8. + */ + public static String generateNewId(String id, int targetLength) { + if (id.length() <= targetLength) { + return id; + } + + if (targetLength <= 8) { + throw new IllegalArgumentException("targetLength must be greater than 8"); + } + + HashFunction hashFunction = goodFastHash(32); + String hash = hashFunction.hashUnencodedChars(id).toString(); + return id.substring(0, targetLength - hash.length() - 1) + "-" + hash; + } + + /** + * Generates a generic resource id from a given string, avoiding characters specified in the + * illegalChars Pattern. The length of the generated string ID will not exceed the length + * specified by targetLength. + * + * @param baseString the base ID to generate the resource ID from. + * @param illegalChars a pattern of characters to remove from the generated ID. + * @param replaceChar the character to replace all illegal characters with. + * @param targetLength the max length of the generated ID. + * @return the generated resource ID. + */ + public static String generateResourceId( + String baseString, + Pattern illegalChars, + String replaceChar, + int targetLength, + DateTimeFormatter timeFormat) { + // first, make sure the baseString, typically the test ID, is not empty + checkArgument(baseString.length() != 0, "baseString cannot be empty."); + + // next, replace all illegal characters from given string with given replacement character + String illegalCharsRemoved = + illegalChars.matcher(baseString.toLowerCase()).replaceAll(replaceChar); + + // finally, append the date/time and return the substring that does not exceed the length limit + LocalDateTime localDateTime = LocalDateTime.now(ZoneId.of(TIME_ZONE)); + String timeAddOn = localDateTime.format(timeFormat); + return illegalCharsRemoved.subSequence( + 0, min(targetLength - timeAddOn.length() - 1, illegalCharsRemoved.length())) + + replaceChar + + localDateTime.format(timeFormat); + } + + /** Generates random letter for padding. */ + public static char generatePadding() { + Random random = new Random(); + return (char) ('a' + random.nextInt(26)); + } + + /** + * Checks whether the given project ID is valid according to GCP constraints. + * + * @param idToCheck the project ID to check. + * @throws IllegalArgumentException if the project ID is invalid. + */ + public static void checkValidProjectId(String idToCheck) { + if (idToCheck.length() < MIN_PROJECT_ID_LENGTH) { + throw new IllegalArgumentException("Project ID " + idToCheck + " cannot be empty."); + } + if (idToCheck.length() > MAX_PROJECT_ID_LENGTH) { + throw new IllegalArgumentException( + "Project ID " + + idToCheck + + " cannot be longer than " + + MAX_PROJECT_ID_LENGTH + + " characters."); + } + if (ILLEGAL_PROJECT_CHARS.matcher(idToCheck).find()) { + throw new IllegalArgumentException( + "Project ID " + + idToCheck + + " is not a valid ID. Only letters, numbers, hyphens, single quotes, colon, dot and" + + " exclamation points are allowed."); + } + } + + /** + * Cleanup Resources from the given ResourceManagers. It will guarantee that all the cleanups are + * invoked, but still throws / bubbles the first exception at the end if something went wrong. + * + * @param managers Varargs of the managers to clean + */ + public static void cleanResources(ResourceManager... managers) { + + if (managers == null || managers.length == 0) { + return; + } + + Exception bubbleException = null; + + for (ResourceManager manager : managers) { + if (manager == null) { + continue; + } + try { + LOG.info("Cleaning up resource manager {}", manager.getClass().getSimpleName()); + manager.cleanupAll(); + } catch (Exception e) { + LOG.error("Error cleaning the resource manager {}", manager.getClass().getSimpleName()); + if (bubbleException == null) { + bubbleException = e; + } + } + } + + if (bubbleException != null) { + throw new RuntimeException("Error cleaning up resources", bubbleException); + } + } + + public static String generatePassword( Review Comment: This should have some commentary on how it should be used. This is not cryptographically secure, but is probably fine for temporary test envs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
