kennknowles commented on code in PR #26444: URL: https://github.com/apache/beam/pull/26444#discussion_r1178405890
########## it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManagerUtils.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.cassandra; + +import static org.apache.beam.it.common.utils.ResourceManagerUtils.generateResourceId; + +import com.google.re2j.Pattern; Review Comment: Why this over the stuff that ships with Java? It seems basically the same ########## it/common/src/main/java/org/apache/beam/it/common/ResourceManager.java: ########## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.common; + +/** Common interface across resource managers. */ +public interface ResourceManager { Review Comment: This seems close to `Closeable` or `Autocloseable`. Would it make sense to use it in that style? ########## it/common/src/main/java/org/apache/beam/it/common/AbstractPipelineLauncher.java: ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.common; Review Comment: "common" is not a very great module - it ends up just having everything in it. And the code in this file is for Dataflow, not common. ########## it/common/src/main/java/org/apache/beam/it/common/AbstractPipelineLauncher.java: ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.common; + +import static org.apache.beam.it.common.logging.LogStrings.formatForLogging; +import static org.apache.beam.it.common.utils.RetryUtil.clientRetryPolicy; + +import com.google.api.client.util.ArrayMap; +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.model.Environment; +import com.google.api.services.dataflow.model.Job; +import com.google.api.services.dataflow.model.MetricUpdate; +import dev.failsafe.Failsafe; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; Review Comment: Other files use a Google `Pattern` class, but here the one that comes with Java is used. ########## it/common/src/main/java/org/apache/beam/it/common/PipelineLauncher.java: ########## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.common; + +import static org.apache.beam.it.common.utils.PipelineUtils.createJobName; + +import com.google.api.services.dataflow.model.Job; +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; + +/** Client for working with Cloud Dataflow. */ +public interface PipelineLauncher { + /** Enum representing Apache Beam SDKs. */ + enum Sdk { + JAVA("JAVA"), + PYTHON("PYTHON"), + GO("GO"); + + private final String text; + + Sdk(String text) { + this.text = text; + } + + @Override + public String toString() { + return text; + } + } + + /** Enum representing known Dataflow job states. */ + enum JobState { Review Comment: Aren't these already in the SDK? ########## it/common/src/main/java/org/apache/beam/it/common/conditions/ConditionCheck.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.common.conditions; + +import java.util.function.Supplier; +import javax.annotation.Nullable; Review Comment: use checker annotations ########## it/common/src/main/java/org/apache/beam/it/common/package-info.java: ########## @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** Package for managing common ResourceManager resources within integration tests. */ +package org.apache.beam.it.common; Review Comment: just drop the `common` ########## it/common/src/main/java/org/apache/beam/it/common/matchers/package-info.java: ########## @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** Package for Truth matchers / subjects to have reusable assertions. */ +package org.apache.beam.it.common.matchers; Review Comment: Since it is specifically for truth matchers, perhaps `org.apache.beam.it.truthmatchers` or something. It is fine as-is but imagine we had built the framework for Hamcrest and then Truth came out and we wanted to also support it. Separating modules according to the dependencies they pull in is often a good idea. ########## it/mongodb/src/test/java/org/apache/beam/it/mongodb/DefaultMongoDBResourceManagerTest.java: ########## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.mongodb; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.mongodb.MongoBulkWriteException; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.MongoIterable; +import java.io.IOException; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.bson.Document; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.testcontainers.containers.MongoDBContainer; + +/** Unit tests for {@link DefaultMongoDBResourceManager}. */ +@RunWith(JUnit4.class) +public class DefaultMongoDBResourceManagerTest { + + @Rule public final MockitoRule mockito = MockitoJUnit.rule(); + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MongoIterable<String> collectionIterable; + + @Mock private MongoClient mongoClient; + @Mock private MongoDatabase database; + @Mock private MongoCollection<Document> collection; + @Mock private MongoCursor<String> collectionNames; + @Mock private MongoDBContainer container; Review Comment: Overall, I'm concerned that the amount of mocking means that this file may create vacuous tests of our assumptions of how things behave, but do not test how they actually might behave. Is there any other way to have a lightweight fake? ########## it/common/src/main/java/org/apache/beam/it/common/PipelineLauncher.java: ########## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.common; + +import static org.apache.beam.it.common.utils.PipelineUtils.createJobName; + +import com.google.api.services.dataflow.model.Job; +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; + +/** Client for working with Cloud Dataflow. */ Review Comment: Can this framework support running pipelines more generally, or just Dataflow? ########## it/common/src/main/java/org/apache/beam/it/common/PipelineLauncher.java: ########## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.common; + +import static org.apache.beam.it.common.utils.PipelineUtils.createJobName; + +import com.google.api.services.dataflow.model.Job; +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; + +/** Client for working with Cloud Dataflow. */ +public interface PipelineLauncher { + /** Enum representing Apache Beam SDKs. */ + enum Sdk { + JAVA("JAVA"), + PYTHON("PYTHON"), + GO("GO"); + + private final String text; + + Sdk(String text) { + this.text = text; + } + + @Override + public String toString() { + return text; + } + } + + /** Enum representing known Dataflow job states. */ + enum JobState { + UNKNOWN("JOB_STATE_UNKNOWN"), + STOPPED("JOB_STATE_STOPPED"), + RUNNING("JOB_STATE_RUNNING"), + DONE("JOB_STATE_DONE"), + FAILED("JOB_STATE_FAILED"), + CANCELLED("JOB_STATE_CANCELLED"), + UPDATED("JOB_STATE_UPDATED"), + DRAINING("JOB_STATE_DRAINING"), + DRAINED("JOB_STATE_DRAINED"), + PENDING("JOB_STATE_PENDING"), + CANCELLING("JOB_STATE_CANCELLING"), + QUEUED("JOB_STATE_QUEUED"), + RESOURCE_CLEANING_UP("JOB_STATE_RESOURCE_CLEANING_UP"); + + private static final String DATAFLOW_PREFIX = "JOB_STATE_"; + + /** States that indicate the job is getting ready to run. */ + public static final ImmutableSet<JobState> PENDING_STATES = ImmutableSet.of(PENDING, QUEUED); Review Comment: This stuff all seems like it would already be in the DataflowRunner code base, and that is the best place to add it. ########## it/cassandra/src/main/java/org/apache/beam/it/cassandra/DefaultCassandraResourceManager.java: ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.cassandra; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.DriverTimeoutException; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; Review Comment: Prefer checkerframework annotations since they can go in more places (like on type uses) ########## it/common/src/main/java/org/apache/beam/it/common/PipelineOperator.java: ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.common; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.function.Supplier; +import org.apache.beam.it.common.PipelineLauncher.JobState; +import org.apache.beam.sdk.function.ThrowingConsumer; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utilities for managing Dataflow jobs. */ +public final class PipelineOperator { + + private static final Logger LOG = LoggerFactory.getLogger(PipelineOperator.class); + + /** The result of running an operation. */ + public enum Result { + CONDITION_MET, + LAUNCH_FINISHED, + LAUNCH_FAILED, + TIMEOUT + } + + private final PipelineLauncher client; + + public PipelineOperator(PipelineLauncher client) { + this.client = client; + } + + /** + * Waits until the given job is done, timing out it if runs for too long. + * + * <p>If the job is a batch job, it should complete eventually. If it is a streaming job, this + * will time out unless the job is explicitly cancelled or drained. + * + * @param config the configuration for performing the operation + * @return the result, which will be {@link Result#LAUNCH_FINISHED}, {@link Result#LAUNCH_FAILED} + * or {@link Result#TIMEOUT} + */ + @SuppressWarnings("rawtypes") Review Comment: Prefer to suppress this on a single line that creates the `Supplier[]` and assigns it to a variable with a proper type like `Supplier<Boolean>[]` ########## it/common/src/main/java/org/apache/beam/it/common/PipelineLauncher.java: ########## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.common; + +import static org.apache.beam.it.common.utils.PipelineUtils.createJobName; + +import com.google.api.services.dataflow.model.Job; +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; + +/** Client for working with Cloud Dataflow. */ +public interface PipelineLauncher { + /** Enum representing Apache Beam SDKs. */ + enum Sdk { + JAVA("JAVA"), + PYTHON("PYTHON"), + GO("GO"); + + private final String text; + + Sdk(String text) { + this.text = text; + } + + @Override + public String toString() { + return text; + } + } + + /** Enum representing known Dataflow job states. */ + enum JobState { + UNKNOWN("JOB_STATE_UNKNOWN"), + STOPPED("JOB_STATE_STOPPED"), + RUNNING("JOB_STATE_RUNNING"), + DONE("JOB_STATE_DONE"), + FAILED("JOB_STATE_FAILED"), + CANCELLED("JOB_STATE_CANCELLED"), + UPDATED("JOB_STATE_UPDATED"), + DRAINING("JOB_STATE_DRAINING"), + DRAINED("JOB_STATE_DRAINED"), + PENDING("JOB_STATE_PENDING"), + CANCELLING("JOB_STATE_CANCELLING"), + QUEUED("JOB_STATE_QUEUED"), + RESOURCE_CLEANING_UP("JOB_STATE_RESOURCE_CLEANING_UP"); + + private static final String DATAFLOW_PREFIX = "JOB_STATE_"; + + /** States that indicate the job is getting ready to run. */ + public static final ImmutableSet<JobState> PENDING_STATES = ImmutableSet.of(PENDING, QUEUED); + + /** States that indicate the job is running. */ + public static final ImmutableSet<JobState> ACTIVE_STATES = ImmutableSet.of(RUNNING, UPDATED); + + /** States that indicate that the job is done. */ + public static final ImmutableSet<JobState> DONE_STATES = + ImmutableSet.of(CANCELLED, DONE, DRAINED, STOPPED); + + /** States that indicate that the job has failed. */ + public static final ImmutableSet<JobState> FAILED_STATES = ImmutableSet.of(FAILED); + + /** States that indicate that the job is in the process of finishing. */ + public static final ImmutableSet<JobState> FINISHING_STATES = + ImmutableSet.of(DRAINING, CANCELLING); + + private final String text; + + JobState(String text) { + this.text = text; + } + + /** + * Parses the state from Dataflow. + * + * <p>Always use this in place of valueOf. + */ + public static JobState parse(String fromDataflow) { + return valueOf(fromDataflow.replace(DATAFLOW_PREFIX, "")); + } + + @Override + public String toString() { + return text; + } + } + + /** Config for starting a Dataflow job. */ + class LaunchConfig { + private final String jobName; + private final ImmutableMap<String, String> parameters; + private final ImmutableMap<String, Object> environment; + @Nullable private final String specPath; + @Nullable private final Sdk sdk; + @Nullable private final String executable; + @Nullable private final Pipeline pipeline; + + private LaunchConfig(Builder builder) { + this.jobName = builder.jobName; + this.parameters = ImmutableMap.copyOf(builder.parameters); + this.environment = ImmutableMap.copyOf(builder.environment); + this.specPath = builder.specPath; + this.sdk = builder.sdk; + this.executable = builder.executable; + this.pipeline = builder.pipeline; + } + + public String jobName() { + return jobName; + } + + public ImmutableMap<String, String> parameters() { + return parameters; + } + + public ImmutableMap<String, Object> environment() { + return environment; + } + + @Nullable + public String getParameter(String key) { + return parameters.get(key); + } + + public @Nullable String specPath() { + return specPath; + } + + public @Nullable Sdk sdk() { + return sdk; + } + + public @Nullable String executable() { + return executable; + } + + public @Nullable Pipeline pipeline() { + return pipeline; + } + + public static Builder builderWithName(String jobName, String specPath) { + return new Builder(jobName, specPath); + } + + public static Builder builder(String testName, @Nullable String specPath) { + return new Builder(createJobName(testName), specPath); + } + + public static Builder builder(String jobName) { + return builder(jobName, null); + } + + /** Builder for the {@link LaunchConfig}. */ + public static final class Builder { + private final String jobName; + private final @Nullable String specPath; + private final Map<String, Object> environment; + private Map<String, String> parameters; + private @Nullable Sdk sdk; + private @Nullable String executable; + private @Nullable Pipeline pipeline; + + private Builder(String jobName, @Nullable String specPath) { + this.jobName = jobName; + this.parameters = new HashMap<>(); + this.environment = new HashMap<>(); + this.specPath = specPath; + } + + public String getJobName() { + return jobName; + } + + @Nullable Review Comment: Use checkerframework `@Nullable` annotation and annotate the return type, not the overall method. Do this throughout the file, and all the new code. ########## it/common/src/main/java/org/apache/beam/it/common/matchers/LaunchInfoSubject.java: ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.common.matchers; + +import com.google.common.truth.FailureMetadata; +import com.google.common.truth.Subject; +import org.apache.beam.it.common.PipelineLauncher.JobState; +import org.apache.beam.it.common.PipelineLauncher.LaunchInfo; + +/** + * Subject that has assertion operations for {@link LaunchInfo}, which has the information for a + * recently launched pipeline. + */ +public final class LaunchInfoSubject extends Subject { + + private final LaunchInfo actual; + + private LaunchInfoSubject(FailureMetadata metadata, LaunchInfo actual) { + super(metadata, actual); Review Comment: Using `super` constructor just to initialize fields is an anti-pattern. It seems to be built into Truth, which is a bummer. Is this the intended use of `Subject`? Is there a way to use composition instead of inheritance to more cleanly separate this class? ########## it/common/src/main/java/org/apache/beam/it/common/conditions/package-info.java: ########## @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** Package that contains reusable conditions. */ +package org.apache.beam.it.common.conditions; Review Comment: Don't make subpackages of `common`. Common is already "misc" so you can just put `conditions` at the top level. ########## it/common/src/main/java/org/apache/beam/it/common/logging/LogStrings.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.common.logging; Review Comment: Same. Just `org.apache.beam.it.logging` ########## it/mongodb/src/test/java/org/apache/beam/it/mongodb/DefaultMongoDBResourceManagerTest.java: ########## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.mongodb; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.mongodb.MongoBulkWriteException; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.MongoIterable; +import java.io.IOException; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.bson.Document; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.testcontainers.containers.MongoDBContainer; + +/** Unit tests for {@link DefaultMongoDBResourceManager}. */ +@RunWith(JUnit4.class) +public class DefaultMongoDBResourceManagerTest { + + @Rule public final MockitoRule mockito = MockitoJUnit.rule(); + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MongoIterable<String> collectionIterable; + + @Mock private MongoClient mongoClient; + @Mock private MongoDatabase database; + @Mock private MongoCollection<Document> collection; + @Mock private MongoCursor<String> collectionNames; + @Mock private MongoDBContainer container; + + private static final String TEST_ID = "test-id"; + private static final String COLLECTION_NAME = "collection-name"; + private static final String STATIC_DATABASE_NAME = "database"; + private static final String HOST = "localhost"; + private static final int MONGO_DB_PORT = 27017; + private static final int MAPPED_PORT = 10000; + + private DefaultMongoDBResourceManager testManager; + + @Before + public void setUp() throws IOException, InterruptedException { + when(container.getHost()).thenReturn(HOST); + when(container.getMappedPort(MONGO_DB_PORT)).thenReturn(MAPPED_PORT); + + testManager = + new DefaultMongoDBResourceManager( + mongoClient, container, DefaultMongoDBResourceManager.builder(TEST_ID)); + } + + @Test + public void testCreateResourceManagerBuilderReturnsDefaultMongoDBResourceManager() + throws IOException { + assertThat( + DefaultMongoDBResourceManager.builder(TEST_ID) + .useStaticContainer() + .setHost(HOST) + .setPort(MONGO_DB_PORT) + .build()) + .isInstanceOf(DefaultMongoDBResourceManager.class); + } + + @Test + public void testGetUriShouldReturnCorrectValue() { + assertThat(testManager.getUri()).matches("mongodb://" + HOST + ":" + MAPPED_PORT); + } + + @Test + public void testGetDatabaseNameShouldReturnCorrectValue() { + assertThat(testManager.getDatabaseName()).matches(TEST_ID + "-\\d{8}-\\d{6}-\\d{6}"); Review Comment: All the setup for the test is outside of the test case. You should inline it so that what is being tested is clear. Currently it basically just tests the setup. ########## it/common/src/main/java/org/apache/beam/it/common/matchers/ListAccumulator.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.common.matchers; Review Comment: just `org.apache.beam.it.matchers` ########## it/mongodb/src/main/java/org/apache/beam/it/mongodb/DefaultMongoDBResourceManager.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.mongodb; + +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import java.io.IOException; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.it.common.testcontainers.TestContainerResourceManager; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.utility.DockerImageName; + +/** + * Default class for implementation of {@link MongoDBResourceManager} interface. + * + * <p>The class supports one database and multiple collections per database object. A database is + * created when the first collection is created if one has not been created already. + * + * <p>The database name is formed using testId. The database name will be "{testId}-{ISO8601 time, + * microsecond precision}", with additional formatting. + * + * <p>The class is thread-safe. + */ +public class DefaultMongoDBResourceManager extends TestContainerResourceManager<MongoDBContainer> + implements MongoDBResourceManager { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultMongoDBResourceManager.class); + + private static final String DEFAULT_MONGODB_CONTAINER_NAME = "mongo"; + + // A list of available MongoDB Docker image tags can be found at + // https://hub.docker.com/_/mongo/tags + private static final String DEFAULT_MONGODB_CONTAINER_TAG = "4.0.18"; + + // 27017 is the default port that MongoDB is configured to listen on + private static final int MONGODB_INTERNAL_PORT = 27017; + + private final MongoClient mongoClient; + private final String databaseName; + private final String connectionString; + private final boolean usingStaticDatabase; + + private DefaultMongoDBResourceManager(DefaultMongoDBResourceManager.Builder builder) { + this( + /* mongoClient= */ null, + new MongoDBContainer( + DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)), + builder); + } + + @SuppressWarnings("method.invocation") + @VisibleForTesting + DefaultMongoDBResourceManager( + @Nullable MongoClient mongoClient, + MongoDBContainer container, + DefaultMongoDBResourceManager.Builder builder) { + super(container, builder); + + this.usingStaticDatabase = builder.databaseName != null; + this.databaseName = + usingStaticDatabase + ? builder.databaseName + : MongoDBResourceManagerUtils.generateDatabaseName(builder.testId); + this.connectionString = + String.format("mongodb://%s:%d", this.getHost(), this.getPort(MONGODB_INTERNAL_PORT)); + this.mongoClient = mongoClient == null ? MongoClients.create(connectionString) : mongoClient; + } + + public static DefaultMongoDBResourceManager.Builder builder(String testId) throws IOException { + return new DefaultMongoDBResourceManager.Builder(testId); + } + + @Override + public synchronized String getUri() { + return connectionString; + } + + @Override + public synchronized String getDatabaseName() { + return databaseName; + } + + private synchronized MongoDatabase getDatabase() { + try { + return mongoClient.getDatabase(databaseName); + } catch (Exception e) { + throw new MongoDBResourceManagerException( + "Error retrieving database " + databaseName + " from MongoDB.", e); + } + } + + private synchronized boolean collectionExists(String collectionName) { + // Check collection name + MongoDBResourceManagerUtils.checkValidCollectionName(databaseName, collectionName); + + Iterable<String> collectionNames = getDatabase().listCollectionNames(); + for (String name : collectionNames) { + // The Collection already exists in the database, return false. + if (collectionName.equals(name)) { + return true; + } + } + + return false; + } + + @Override + public synchronized boolean createCollection(String collectionName) { + LOG.info("Creating collection using collectionName '{}'.", collectionName); + + try { + // Check to see if the Collection exists + if (collectionExists(collectionName)) { + return false; + } + // The Collection does not exist in the database, create it and return true. + getDatabase().getCollection(collectionName); + } catch (Exception e) { + throw new MongoDBResourceManagerException("Error creating collection.", e); + } + + LOG.info("Successfully created collection {}.{}", databaseName, collectionName); + + return true; + } + + /** + * Helper method to retrieve a MongoCollection with the given name from the database and return + * it. + * + * @param collectionName The name of the MongoCollection. + * @param createCollection A boolean that specifies to create the Collection if it does not exist. + * @return A MongoCollection with the given name. + */ + private MongoCollection<Document> getMongoDBCollection( + String collectionName, boolean createCollection) { + if (!collectionExists(collectionName) && !createCollection) { + throw new MongoDBResourceManagerException( + "Collection " + collectionName + " does not exists in database " + databaseName); + } + + return getDatabase().getCollection(collectionName); + } + + @Override + public synchronized boolean insertDocument(String collectionName, Document document) { + return insertDocuments(collectionName, ImmutableList.of(document)); + } + + @Override + public synchronized boolean insertDocuments(String collectionName, List<Document> documents) { + LOG.info( + "Attempting to write {} documents to {}.{}.", + documents.size(), + databaseName, + collectionName); + + try { + getMongoDBCollection(collectionName, /* createCollection= */ true).insertMany(documents); + } catch (Exception e) { + throw new MongoDBResourceManagerException("Error inserting documents.", e); + } + + LOG.info( + "Successfully wrote {} documents to {}.{}", documents.size(), databaseName, collectionName); + + return true; + } + + @Override + public synchronized FindIterable<Document> readCollection(String collectionName) { + LOG.info("Reading all documents from {}.{}", databaseName, collectionName); + + FindIterable<Document> documents; + try { + documents = getMongoDBCollection(collectionName, /* createCollection= */ false).find(); + } catch (Exception e) { + throw new MongoDBResourceManagerException("Error reading collection.", e); + } + + LOG.info("Successfully loaded documents from {}.{}", databaseName, collectionName); + + return documents; + } + + @Override + public synchronized void cleanupAll() { + LOG.info("Attempting to cleanup MongoDB manager."); + + boolean producedError = false; + + // First, delete the database if it was not given as a static argument + try { + if (!usingStaticDatabase) { + mongoClient.getDatabase(databaseName).drop(); + } + } catch (Exception e) { + LOG.error("Failed to delete MongoDB database {}.", databaseName, e); + producedError = true; + } + + // Next, try to close the MongoDB client connection + try { + mongoClient.close(); + } catch (Exception e) { + LOG.error("Failed to delete MongoDB client.", e); + producedError = true; + } + + // Throw Exception at the end if there were any errors + if (producedError) { + throw new MongoDBResourceManagerException( + "Failed to delete resources. Check above for errors."); + } + + super.cleanupAll(); + + LOG.info("MongoDB manager successfully cleaned up."); + } + + /** Builder for {@link DefaultMongoDBResourceManager}. */ + public static final class Builder + extends TestContainerResourceManager.Builder<DefaultMongoDBResourceManager> { + + @SuppressWarnings("initialization.fields.uninitialized") Review Comment: Why? Based on the API here, you should make this nullable, and check for null when you use it. ########## it/mongodb/build.gradle: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature( + automaticModuleName: 'org.apache.beam.it.mongodb', +) + +description = "Apache Beam :: IT :: MongoDB" +ext.summary = "Integration test utilities for MongoDB." + +dependencies { + implementation project(':it:common') + implementation library.java.testcontainers_base + implementation library.java.testcontainers_mongodb + implementation library.java.vendored_guava_26_0_jre + implementation library.java.slf4j_api + //implementation 'org.mongodb:mongodb-driver-sync:3.12.10' + implementation library.java.mongo_java_driver + implementation 'com.google.re2j:re2j:1.6' Review Comment: same comment here - why use the Google library? should at least have a good reason and comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
