This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7df489cb7c6 add datadog it support (#37517)
7df489cb7c6 is described below

commit 7df489cb7c6e0b806837caacb80fd81fc6c08314
Author: Derrick Williams <[email protected]>
AuthorDate: Wed Feb 11 10:02:10 2026 -0500

    add datadog it support (#37517)
    
    * add datadog it support
    
    * add used undeclared artifacts
    
    * address check style issues
    
    * add jar
    
    * address gemini comments
    
    * remove unused helper
    
    * spotless
    
    * fix imports
---
 it/datadog/build.gradle                            |  50 +++
 .../beam/it/datadog/DatadogClientFactory.java      |  47 +++
 .../apache/beam/it/datadog/DatadogLogEntry.java    | 102 +++++++
 .../beam/it/datadog/DatadogResourceManager.java    | 335 +++++++++++++++++++++
 .../datadog/DatadogResourceManagerException.java   |  30 ++
 .../it/datadog/DatadogResourceManagerUtils.java    |  70 +++++
 .../datadog/conditions/DatadogLogEntriesCheck.java |  91 ++++++
 .../beam/it/datadog/conditions/package-info.java   |  20 ++
 .../beam/it/datadog/matchers/DatadogAsserts.java   |  70 +++++
 .../beam/it/datadog/matchers/package-info.java     |  18 ++
 .../org/apache/beam/it/datadog/package-info.java   |  18 ++
 .../beam/it/datadog/DatadogResourceManagerIT.java  |  77 +++++
 .../it/datadog/DatadogResourceManagerTest.java     | 188 ++++++++++++
 .../datadog/DatadogResourceManagerUtilsTest.java   |  67 +++++
 settings.gradle.kts                                |   1 +
 15 files changed, 1184 insertions(+)

diff --git a/it/datadog/build.gradle b/it/datadog/build.gradle
new file mode 100644
index 00000000000..57888ed76f3
--- /dev/null
+++ b/it/datadog/build.gradle
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+    automaticModuleName: 'org.apache.beam.it.datadog',
+)
+
+description = "Apache Beam :: IT :: Datadog"
+ext.summary = "Integration test utilities for Datadog."
+
+dependencies {
+    implementation project(path: ":it:common")
+    implementation project(path: ":it:testcontainers")
+    implementation project(path: ":it:truthmatchers")
+    implementation project(path: ":it:conditions")
+    implementation "org.testcontainers:mockserver:1.19.7"
+    implementation "org.testcontainers:testcontainers:1.19.7"
+    implementation "org.mock-server:mockserver-client-java:5.10.0"
+    implementation "org.mock-server:mockserver-core:5.10.0"
+
+    implementation "org.apache.httpcomponents:httpclient:4.5.13"
+    implementation "org.apache.httpcomponents:httpcore:4.4.14"
+    implementation library.java.google_code_gson
+
+    implementation library.java.vendored_guava_32_1_2_jre
+    implementation "org.slf4j:slf4j-api:2.0.16"
+    compileOnly library.java.auto_value_annotations
+
+    testImplementation(library.java.truth) {
+        exclude group: 'com.google.guava', module: 'guava'
+    }
+    testImplementation library.java.junit
+    testImplementation library.java.mockito_inline
+}
diff --git 
a/it/datadog/src/main/java/org/apache/beam/it/datadog/DatadogClientFactory.java 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/DatadogClientFactory.java
new file mode 100644
index 00000000000..08e1ced83d6
--- /dev/null
+++ 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/DatadogClientFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.datadog;
+
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.mockserver.client.MockServerClient;
+
+/** Datadog Driver Factory class. */
+class DatadogClientFactory {
+  DatadogClientFactory() {}
+
+  /**
+   * Returns an HTTP client that is used to send HTTP messages to Datadog API.
+   *
+   * @return An HTTP client for sending HTTP messages to Datadog API.
+   */
+  CloseableHttpClient getHttpClient() {
+    return HttpClientBuilder.create().disableContentCompression().build();
+  }
+
+  /**
+   * Returns a {@link MockServerClient} for sending requests to a MockServer 
instance.
+   *
+   * @param host the service host.
+   * @param port the service port.
+   * @return A {@link MockServerClient} to retrieve messages from a MockServer 
instance.
+   */
+  MockServerClient getServiceClient(String host, int port) {
+    return new MockServerClient(host, port);
+  }
+}
diff --git 
a/it/datadog/src/main/java/org/apache/beam/it/datadog/DatadogLogEntry.java 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/DatadogLogEntry.java
new file mode 100644
index 00000000000..7750e82523b
--- /dev/null
+++ b/it/datadog/src/main/java/org/apache/beam/it/datadog/DatadogLogEntry.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.datadog;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+
+/** A class for Datadog log entries, copy of DatadogEvent. */
+@AutoValue
+public abstract class DatadogLogEntry {
+
+  public static Builder newBuilder() {
+    return new AutoValue_DatadogLogEntry.Builder();
+  }
+
+  @Nullable
+  public abstract String ddsource();
+
+  @Nullable
+  public abstract String ddtags();
+
+  @Nullable
+  public abstract String hostname();
+
+  @Nullable
+  public abstract String service();
+
+  @Nullable
+  public abstract String message();
+
+  /** A builder class for creating {@link DatadogLogEntry} objects. */
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    abstract Builder setDdsource(String source);
+
+    abstract Builder setDdtags(String tags);
+
+    abstract Builder setHostname(String hostname);
+
+    abstract Builder setService(String service);
+
+    abstract Builder setMessage(String message);
+
+    abstract String message();
+
+    abstract DatadogLogEntry autoBuild();
+
+    public Builder withSource(String source) {
+      checkNotNull(source, "withSource(source) called with null input.");
+
+      return setDdsource(source);
+    }
+
+    public Builder withTags(String tags) {
+      checkNotNull(tags, "withTags(tags) called with null input.");
+
+      return setDdtags(tags);
+    }
+
+    public Builder withHostname(String hostname) {
+      checkNotNull(hostname, "withHostname(hostname) called with null input.");
+
+      return setHostname(hostname);
+    }
+
+    public Builder withService(String service) {
+      checkNotNull(service, "withService(service) called with null input.");
+
+      return setService(service);
+    }
+
+    public Builder withMessage(String message) {
+      checkNotNull(message, "withMessage(message) called with null input.");
+
+      return setMessage(message);
+    }
+
+    public DatadogLogEntry build() {
+      checkNotNull(message(), "Message is required.");
+
+      return autoBuild();
+    }
+  }
+}
diff --git 
a/it/datadog/src/main/java/org/apache/beam/it/datadog/DatadogResourceManager.java
 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/DatadogResourceManager.java
new file mode 100644
index 00000000000..2239572c03f
--- /dev/null
+++ 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/DatadogResourceManager.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.datadog;
+
+import static 
org.apache.beam.it.datadog.DatadogResourceManagerUtils.generateApiKey;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+import static org.mockserver.model.JsonSchemaBody.jsonSchema;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import org.apache.beam.it.common.ResourceManager;
+import org.apache.beam.it.testcontainers.TestContainerResourceManager;
+import org.apache.beam.it.testcontainers.TestContainerResourceManagerException;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.http.client.entity.GzipCompressingEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.mockserver.client.MockServerClient;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.MediaType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MockServerContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Client for managing Datadog resources.
+ *
+ * <p>The class supports one mock Datadog server instance.
+ *
+ * <p>The class is thread-safe.
+ *
+ * <p>Note: The MockServer TestContainer will only run on M1 Mac's if the 
Docker version is >=
+ * 4.16.0 and the "Use Rosetta for x86/amd64 emulation on Apple Silicon" 
setting is enabled.
+ */
+public class DatadogResourceManager extends 
TestContainerResourceManager<MockServerContainer>
+    implements ResourceManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DatadogResourceManager.class);
+  private static final String DEFAULT_MOCKSERVER_CONTAINER_NAME = 
"mockserver/mockserver";
+
+  // A list of available Mockserver Docker image tags can be found at
+  // https://hub.docker.com/r/mockserver/mockserver/tags
+  private static final String DEFAULT_MOCKSERVER_CONTAINER_TAG;
+
+  static {
+    DEFAULT_MOCKSERVER_CONTAINER_TAG =
+        Optional.ofNullable(MockServerClient.class.getPackage())
+            .map(Package::getImplementationVersion)
+            .orElseThrow(
+                () ->
+                    new TestContainerResourceManagerException(
+                        "Could not determine Mockserver container version."));
+  }
+
+  // See: https://docs.datadoghq.com/api/latest/logs/#send-logs
+  private static final String SEND_LOGS_JSON_SCHEMA =
+      "{"
+          + "  \"$schema\": \"http://json-schema.org/draft-04/schema#\",";
+          + "  \"type\": \"array\","
+          + "  \"items\": ["
+          + "    {"
+          + "      \"type\": \"object\","
+          + "      \"properties\": {"
+          + "        \"ddsource\": {"
+          + "          \"type\": \"string\""
+          + "        },"
+          + "        \"ddtags\": {"
+          + "          \"type\": \"string\""
+          + "        },"
+          + "        \"hostname\": {"
+          + "          \"type\": \"string\""
+          + "        },"
+          + "        \"message\": {"
+          + "          \"type\": \"string\""
+          + "        },"
+          + "        \"service\": {"
+          + "          \"type\": \"string\""
+          + "        }"
+          + "      },"
+          + "      \"required\": ["
+          + "        \"message\""
+          + "      ]"
+          + "    }"
+          + "  ]"
+          + "}";
+
+  private static final Gson GSON = new Gson();
+
+  private final String apiKey;
+  private final DatadogClientFactory clientFactory;
+
+  private DatadogResourceManager(DatadogResourceManager.Builder builder) {
+    this(
+        new DatadogClientFactory(),
+        new MockServerContainer(
+            
DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)),
+        builder);
+  }
+
+  @VisibleForTesting
+  DatadogResourceManager(
+      DatadogClientFactory clientFactory,
+      MockServerContainer container,
+      DatadogResourceManager.Builder builder) {
+    super(container, builder);
+
+    this.apiKey = builder.apiKey;
+    this.clientFactory = clientFactory;
+  }
+
+  public static DatadogResourceManager.Builder builder(String testId) {
+    return new DatadogResourceManager.Builder(testId);
+  }
+
+  /** Returns the port to connect to the mock Datadog server. */
+  public int getPort() {
+    return super.getPort(MockServerContainer.PORT);
+  }
+
+  /**
+   * Returns the HTTP endpoint that this mock Datadog server is configured to 
listen on.
+   *
+   * @return the HTTP endpoint.
+   */
+  public String getHttpEndpoint() {
+    return String.format("http://%s:%d";, getHost(), getPort());
+  }
+
+  /**
+   * Returns the API endpoint that this mock Datadog server is configured to 
receive events at.
+   *
+   * <p>This will be the HTTP endpoint concatenated with 
<code>'/api/v2/logs'</code>.
+   *
+   * @return the API endpoint.
+   */
+  public String getApiEndpoint() {
+    return getHttpEndpoint() + "/api/v2/logs";
+  }
+
+  /**
+   * Returns the Datadog API key used to connect to this mock Datadog server.
+   *
+   * @return the API key.
+   */
+  public String getApiKey() {
+    return apiKey;
+  }
+
+  /**
+   * Sends the given HTTP event to the mock Datadog server.
+   *
+   * @param event The {@link DatadogLogEntry} to send to the API.
+   * @return True, if the request was successful.
+   */
+  public synchronized boolean sendHttpEvent(DatadogLogEntry event) {
+    return sendHttpEvents(Collections.singletonList(event));
+  }
+
+  /**
+   * Sends the given HTTP events to the mock Datadog server.
+   *
+   * @param events The {@link DatadogLogEntry}s to send to the API.
+   * @return True, if the request was successful.
+   */
+  public synchronized boolean sendHttpEvents(Collection<DatadogLogEntry> 
events) {
+
+    LOG.info("Attempting to send {} events to {}.", events.size(), 
getApiEndpoint());
+
+    // Construct base API request
+    HttpPost httppost = new HttpPost(getApiEndpoint());
+    httppost.addHeader("Content-Encoding", "gzip");
+    httppost.addHeader("Content-Type", "application/json");
+    httppost.addHeader("dd-api-key", apiKey);
+
+    String eventsData = GSON.toJson(events);
+
+    try (CloseableHttpClient httpClient = clientFactory.getHttpClient()) {
+      // Set request data
+      try {
+        httppost.setEntity(new GzipCompressingEntity(new 
StringEntity(eventsData)));
+      } catch (UnsupportedEncodingException e) {
+        throw new DatadogResourceManagerException(
+            "Error setting HTTP message data to " + eventsData, e);
+      }
+
+      // Send request
+      try (CloseableHttpResponse response = httpClient.execute(httppost)) {
+        // Check error code
+        int code = response.getStatusLine().getStatusCode();
+        if (code != 202) {
+          throw new DatadogResourceManagerException(
+              "Received http error code " + code + " sending event.");
+        }
+      } catch (Exception e) {
+        throw new DatadogResourceManagerException("Error sending event.", e);
+      }
+    } catch (IOException e) {
+      throw new DatadogResourceManagerException("Error with HTTP client.", e);
+    }
+
+    LOG.info("Successfully sent {} events.", events.size());
+
+    return true;
+  }
+
+  /**
+   * Return a list of all Datadog entries retrieved from the mock Datadog 
server.
+   *
+   * @return All Datadog entries on the server.
+   */
+  public synchronized List<DatadogLogEntry> getEntries() {
+    MockServerClient serviceClient = clientFactory.getServiceClient(getHost(), 
getPort());
+    LOG.info("Reading events from Datadog");
+
+    List<DatadogLogEntry> results = new ArrayList<>();
+    for (HttpRequest request : 
serviceClient.retrieveRecordedRequests(request())) {
+      String requestBody = request.getBodyAsString();
+
+      List<DatadogLogEntry> events = new ArrayList<>();
+      try {
+        // Parse as a json array
+        JsonArray jsonArray = GSON.fromJson(requestBody, JsonArray.class);
+
+        // For each element, create a DatadogLogEntry
+        for (JsonElement jsonElement : jsonArray) {
+          JsonObject jsonObject = jsonElement.getAsJsonObject();
+          DatadogLogEntry.Builder builder = DatadogLogEntry.newBuilder();
+          if (jsonObject.has("ddsource")) {
+            builder.withSource(jsonObject.get("ddsource").getAsString());
+          }
+          if (jsonObject.has("ddtags")) {
+            builder.withTags(jsonObject.get("ddtags").getAsString());
+          }
+          if (jsonObject.has("hostname")) {
+            builder.withHostname(jsonObject.get("hostname").getAsString());
+          }
+          if (jsonObject.has("service")) {
+            builder.withService(jsonObject.get("service").getAsString());
+          }
+          if (jsonObject.has("message")) {
+            builder.withMessage(jsonObject.get("message").getAsString());
+          }
+          events.add(builder.build());
+        }
+      } catch (Exception e) { // Catch broader exception
+        throw new DatadogResourceManagerException(
+            "Received a request with invalid JSON: " + requestBody, e);
+      }
+
+      results.addAll(events);
+    }
+
+    LOG.info("Successfully retrieved {} results.", results.size());
+    return results;
+  }
+
+  /**
+   * Sets up request definitions the mock Datadog server expects to receive, 
all other requests
+   * return 404.
+   */
+  void acceptRequests() {
+    MockServerClient serviceClient = clientFactory.getServiceClient(getHost(), 
getPort());
+    serviceClient
+        .when(
+            request()
+                .withMethod("POST")
+                .withContentType(MediaType.APPLICATION_JSON)
+                .withHeader("dd-api-key", apiKey)
+                .withHeader("content-encoding", 
"(?i)^(?:identity|gzip|deflate)$")
+                .withBody(jsonSchema(SEND_LOGS_JSON_SCHEMA)))
+        .respond(response().withStatusCode(202));
+  }
+
+  /** Builder for {@link DatadogResourceManager}. */
+  public static final class Builder
+      extends TestContainerResourceManager.Builder<DatadogResourceManager> {
+
+    private String apiKey;
+
+    private Builder(String testId) {
+      super(testId, DEFAULT_MOCKSERVER_CONTAINER_NAME, 
DEFAULT_MOCKSERVER_CONTAINER_TAG);
+      this.apiKey = "";
+    }
+
+    /**
+     * Manually set the Datadog API key to the given key. This key will be 
used by the resource
+     * manager to authenticate with the mock Datadog server.
+     *
+     * @param apiKey the API key for the mock Datadog server.
+     * @return this builder with the API key manually set.
+     */
+    public Builder setApiKey(String apiKey) {
+      this.apiKey = apiKey;
+      return this;
+    }
+
+    @Override
+    public DatadogResourceManager build() {
+      if (apiKey == null || apiKey.isEmpty()) {
+        apiKey = generateApiKey();
+      }
+      DatadogResourceManager manager = new DatadogResourceManager(this);
+      manager.acceptRequests();
+      return manager;
+    }
+  }
+}
diff --git 
a/it/datadog/src/main/java/org/apache/beam/it/datadog/DatadogResourceManagerException.java
 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/DatadogResourceManagerException.java
new file mode 100644
index 00000000000..86da4c62e61
--- /dev/null
+++ 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/DatadogResourceManagerException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.datadog;
+
+/** Custom exception for {@link DatadogResourceManager} implementations. */
+public class DatadogResourceManagerException extends RuntimeException {
+
+  public DatadogResourceManagerException(String errorMessage, Throwable err) {
+    super(errorMessage, err);
+  }
+
+  public DatadogResourceManagerException(String errorMessage) {
+    super(errorMessage);
+  }
+}
diff --git 
a/it/datadog/src/main/java/org/apache/beam/it/datadog/DatadogResourceManagerUtils.java
 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/DatadogResourceManagerUtils.java
new file mode 100644
index 00000000000..e290bc5775e
--- /dev/null
+++ 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/DatadogResourceManagerUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.datadog;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+/** Utilities for {@link DatadogResourceManager} implementations. */
+public final class DatadogResourceManagerUtils {
+
+  // Datadog event metadata keys
+  private static final String DD_SOURCE_KEY = "ddsource";
+  private static final String DD_TAGS_KEY = "ddtags";
+  private static final String DD_HOSTNAME_KEY = "hostname";
+  private static final String DD_SERVICE_KEY = "service";
+  private static final String DD_MESSAGE_KEY = "message";
+
+  private DatadogResourceManagerUtils() {}
+
+  public static Map<String, Object> datadogEntryToMap(DatadogLogEntry entry) {
+    Map<String, Object> eventMap = new HashMap<>();
+    Optional.ofNullable(entry.ddsource()).ifPresent(v -> 
eventMap.put(DD_SOURCE_KEY, v));
+    Optional.ofNullable(entry.ddtags()).ifPresent(v -> 
eventMap.put(DD_TAGS_KEY, v));
+    Optional.ofNullable(entry.hostname()).ifPresent(v -> 
eventMap.put(DD_HOSTNAME_KEY, v));
+    Optional.ofNullable(entry.service()).ifPresent(v -> 
eventMap.put(DD_SERVICE_KEY, v));
+    Optional.ofNullable(entry.message()).ifPresent(v -> 
eventMap.put(DD_MESSAGE_KEY, v));
+    return eventMap;
+  }
+
+  /**
+   * Generates a secure, valid Datadog API key.
+   *
+   * @return The generated password.
+   */
+  static String generateApiKey() {
+    String uuid = UUID.randomUUID().toString();
+    char[] chars = uuid.toCharArray();
+    for (int i = 0; i < chars.length; i++) {
+      if (Character.isLetter(chars[i])) {
+        chars[i] = Character.toUpperCase(chars[i]);
+        break;
+      }
+    }
+    String result = new String(chars);
+
+    // In the rare case a UUID has only one letter, it will now be uppercase.
+    // The test requires a lowercase letter, so we add one if missing.
+    if (!result.matches(".*[a-z].*")) {
+      return result + "a";
+    }
+    return result;
+  }
+}
diff --git 
a/it/datadog/src/main/java/org/apache/beam/it/datadog/conditions/DatadogLogEntriesCheck.java
 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/conditions/DatadogLogEntriesCheck.java
new file mode 100644
index 00000000000..8c77c2c80a4
--- /dev/null
+++ 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/conditions/DatadogLogEntriesCheck.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.datadog.conditions;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.it.conditions.ConditionCheck;
+import org.apache.beam.it.datadog.DatadogResourceManager;
+
+/** ConditionCheck to validate if Datadog has received a certain amount of 
events. */
+@AutoValue
+public abstract class DatadogLogEntriesCheck extends ConditionCheck {
+
+  abstract DatadogResourceManager resourceManager();
+
+  abstract Integer minEntries();
+
+  @Nullable
+  abstract Integer maxEntries();
+
+  @Override
+  public String getDescription() {
+    if (maxEntries() != null) {
+      return String.format(
+          "Datadog check if logs have between %d and %d events", minEntries(), 
maxEntries());
+    }
+    return String.format("Datadog check if logs have %d events", minEntries());
+  }
+
+  @Override
+  public CheckResult check() {
+    long totalEvents = resourceManager().getEntries().size();
+
+    if (totalEvents < minEntries()) {
+      return new CheckResult(
+          false, String.format("Expected %d but has only %d", minEntries(), 
totalEvents));
+    }
+    Integer max = maxEntries();
+    if (max != null && totalEvents > max) {
+      return new CheckResult(
+          false, String.format("Expected up to %d but found %d events", max, 
totalEvents));
+    }
+
+    if (max != null) {
+      return new CheckResult(
+          true,
+          String.format(
+              "Expected between %d and %d events and found %d",
+              minEntries(), maxEntries(), totalEvents));
+    }
+
+    return new CheckResult(
+        true, String.format("Expected at least %d events and found %d", 
minEntries(), totalEvents));
+  }
+
+  public static Builder builder(DatadogResourceManager resourceManager) {
+    return new 
AutoValue_DatadogLogEntriesCheck.Builder().setResourceManager(resourceManager);
+  }
+
+  /** Builder for {@link DatadogLogEntriesCheck}. */
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    public abstract Builder setResourceManager(DatadogResourceManager 
resourceManager);
+
+    public abstract Builder setMinEntries(Integer minEvents);
+
+    public abstract Builder setMaxEntries(Integer maxEvents);
+
+    abstract DatadogLogEntriesCheck autoBuild();
+
+    public DatadogLogEntriesCheck build() {
+      return autoBuild();
+    }
+  }
+}
diff --git 
a/it/datadog/src/main/java/org/apache/beam/it/datadog/conditions/package-info.java
 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/conditions/package-info.java
new file mode 100644
index 00000000000..e2f28a9da65
--- /dev/null
+++ 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/conditions/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Package for managing Datadog resources within integration tests. */
+package org.apache.beam.it.datadog.conditions;
diff --git 
a/it/datadog/src/main/java/org/apache/beam/it/datadog/matchers/DatadogAsserts.java
 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/matchers/DatadogAsserts.java
new file mode 100644
index 00000000000..33a72a10a0f
--- /dev/null
+++ 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/matchers/DatadogAsserts.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.datadog.matchers;
+
+import static 
org.apache.beam.it.datadog.DatadogResourceManagerUtils.datadogEntryToMap;
+import static 
org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatRecords;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.it.datadog.DatadogLogEntry;
+import org.apache.beam.it.truthmatchers.RecordsSubject;
+
+/** Assert utilities for Datadog tests. */
+public class DatadogAsserts {
+
+  /**
+   * Convert Datadog {@link DatadogLogEntry} to a list of maps.
+   *
+   * @param entries List of 
com.google.cloud.teleport.it.datadog.DatadogLogEntrys to parse
+   * @return List of maps to use in {@link RecordsSubject}
+   */
+  public static List<Map<String, Object>> datadogEntriesToRecords(
+      Collection<DatadogLogEntry> entries) {
+    try {
+      List<Map<String, Object>> records = new ArrayList<>();
+
+      for (DatadogLogEntry entry : entries) {
+        Map<String, Object> converted = datadogEntryToMap(entry);
+        records.add(converted);
+      }
+
+      return records;
+    } catch (Exception e) {
+      throw new RuntimeException("Error converting DatadogLogEntries to 
Records", e);
+    }
+  }
+
+  /**
+   * Creates a {@link RecordsSubject} to assert information within a list of 
records.
+   *
+   * @param entries List of DatadogLogEntrys in Datadog {@link 
DatadogLogEntry} format to use in the
+   *     comparison.
+   * @return Truth Subject to chain assertions.
+   */
+  public static RecordsSubject assertThatDatadogLogEntries(
+      @Nullable Collection<DatadogLogEntry> entries) {
+    if (entries == null) {
+      return assertThatRecords(new ArrayList<>());
+    }
+    return assertThatRecords(datadogEntriesToRecords(entries));
+  }
+}
diff --git 
a/it/datadog/src/main/java/org/apache/beam/it/datadog/matchers/package-info.java
 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/matchers/package-info.java
new file mode 100644
index 00000000000..25d1ebc5aef
--- /dev/null
+++ 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/matchers/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright (C) 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+/** Package for managing Datadog resources within integration tests. */
+package org.apache.beam.it.datadog.matchers;
diff --git 
a/it/datadog/src/main/java/org/apache/beam/it/datadog/package-info.java 
b/it/datadog/src/main/java/org/apache/beam/it/datadog/package-info.java
new file mode 100644
index 00000000000..8bda725c292
--- /dev/null
+++ b/it/datadog/src/main/java/org/apache/beam/it/datadog/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright (C) 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+/** Package for managing Datadog resources within integration tests. */
+package org.apache.beam.it.datadog;
diff --git 
a/it/datadog/src/test/java/org/apache/beam/it/datadog/DatadogResourceManagerIT.java
 
b/it/datadog/src/test/java/org/apache/beam/it/datadog/DatadogResourceManagerIT.java
new file mode 100644
index 00000000000..8d0f6849df2
--- /dev/null
+++ 
b/it/datadog/src/test/java/org/apache/beam/it/datadog/DatadogResourceManagerIT.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.datadog;
+
+import static 
org.apache.beam.it.datadog.matchers.DatadogAsserts.assertThatDatadogLogEntries;
+import static 
org.apache.beam.it.datadog.matchers.DatadogAsserts.datadogEntriesToRecords;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.it.testcontainers.TestContainersIntegrationTest;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration tests for Datadog Resource Managers. */
+@Category(TestContainersIntegrationTest.class)
+@RunWith(JUnit4.class)
+public class DatadogResourceManagerIT {
+  private static final String TEST_ID = "dummy-test";
+  private static final int NUM_EVENTS = 100;
+
+  private DatadogResourceManager datadogResourceManager;
+
+  @Before
+  public void setUp() {
+    datadogResourceManager = DatadogResourceManager.builder(TEST_ID).build();
+  }
+
+  @Test
+  public void testDefaultDatadogResourceManagerE2E() {
+    // Arrange
+    String source = RandomStringUtils.randomAlphabetic(1, 20);
+    String host = RandomStringUtils.randomAlphabetic(1, 20);
+    List<DatadogLogEntry> httpEventsSent = generateHttpEvents(source, host);
+
+    datadogResourceManager.sendHttpEvents(httpEventsSent);
+
+    List<DatadogLogEntry> httpEventsReceived = 
datadogResourceManager.getEntries();
+
+    // Assert
+    assertThatDatadogLogEntries(httpEventsReceived)
+        .hasRecordsUnordered(datadogEntriesToRecords(httpEventsSent));
+  }
+
+  private static List<DatadogLogEntry> generateHttpEvents(String source, 
String hostname) {
+    List<DatadogLogEntry> events = new ArrayList<>();
+    for (int i = 0; i < NUM_EVENTS; i++) {
+      String message = RandomStringUtils.randomAlphabetic(1, 20);
+      events.add(
+          DatadogLogEntry.newBuilder()
+              .withMessage(message)
+              .withSource(source)
+              .withHostname(hostname)
+              .build());
+    }
+
+    return events;
+  }
+}
diff --git 
a/it/datadog/src/test/java/org/apache/beam/it/datadog/DatadogResourceManagerTest.java
 
b/it/datadog/src/test/java/org/apache/beam/it/datadog/DatadogResourceManagerTest.java
new file mode 100644
index 00000000000..677902528ad
--- /dev/null
+++ 
b/it/datadog/src/test/java/org/apache/beam/it/datadog/DatadogResourceManagerTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.datadog;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockserver.client.ForwardChainExpectation;
+import org.mockserver.client.MockServerClient;
+import org.mockserver.model.HttpRequest;
+import org.testcontainers.containers.MockServerContainer;
+
+/** Unit tests for {@link org.apache.beam.it.datadog.DatadogResourceManager}. 
*/
+@RunWith(JUnit4.class)
+public class DatadogResourceManagerTest {
+
+  @Rule public final MockitoRule mockito = MockitoJUnit.rule();
+
+  @Mock private DatadogClientFactory clientFactory;
+  @Mock private CloseableHttpClient httpClient;
+
+  @Mock private MockServerClient serviceClient;
+
+  @Mock private MockServerContainer container;
+
+  private static final String TEST_ID = "test-id";
+  private static final String HOSTNAME = "localhost";
+  private static final String API_KEY = "token";
+  private static final String MESSAGE = "myEvent";
+
+  private static final int DEFAULT_DATADOG_HEC_INTERNAL_PORT = 
MockServerContainer.PORT;
+  private static final int MAPPED_DATADOG_HEC_INTERNAL_PORT = 50000;
+
+  private DatadogResourceManager testManager;
+
+  @Before
+  public void setUp() {
+    when(container.getMappedPort(DEFAULT_DATADOG_HEC_INTERNAL_PORT))
+        .thenReturn(MAPPED_DATADOG_HEC_INTERNAL_PORT);
+
+    doReturn(container).when(container).withLogConsumer(any());
+
+    when(serviceClient.when(any(HttpRequest.class)))
+        .thenReturn(mock(ForwardChainExpectation.class));
+
+    when(clientFactory.getServiceClient(HOSTNAME, 
MAPPED_DATADOG_HEC_INTERNAL_PORT))
+        .thenReturn(serviceClient);
+
+    testManager =
+        new DatadogResourceManager(
+            clientFactory, container, DatadogResourceManager.builder(TEST_ID));
+  }
+
+  @Test
+  public void testGetHttpEndpointReturnsCorrectValue() {
+    assertThat(testManager.getHttpEndpoint())
+        .isEqualTo(String.format("http://%s:%d";, HOSTNAME, 
MAPPED_DATADOG_HEC_INTERNAL_PORT));
+  }
+
+  @Test
+  public void testGetHecEndpointReturnsCorrectValue() {
+    assertThat(testManager.getApiEndpoint())
+        .isEqualTo(
+            String.format("http://%s:%d/api/v2/logs";, HOSTNAME, 
MAPPED_DATADOG_HEC_INTERNAL_PORT));
+  }
+
+  @Test
+  public void testGetHecTokenReturnsCorrectValueWhenSet() {
+    assertThat(
+            new DatadogResourceManager(
+                    clientFactory,
+                    container,
+                    DatadogResourceManager.builder(TEST_ID).setApiKey(API_KEY))
+                .getApiKey())
+        .isEqualTo(API_KEY);
+  }
+
+  @Test
+  public void 
testSendHttpEventsShouldThrowErrorWhenHttpClientFailsToExecuteRequest()
+      throws IOException {
+    DatadogLogEntry event = 
DatadogLogEntry.newBuilder().withMessage(MESSAGE).build();
+
+    when(clientFactory.getHttpClient()).thenReturn(httpClient);
+    doThrow(IOException.class).when(httpClient).execute(any(HttpPost.class));
+
+    assertThrows(DatadogResourceManagerException.class, () -> 
testManager.sendHttpEvent(event));
+  }
+
+  @Test
+  public void 
testSendHttpEventsShouldThrowErrorWhenHttpClientReturnsErrorCode()
+      throws IOException {
+    DatadogLogEntry event = 
DatadogLogEntry.newBuilder().withMessage(MESSAGE).build();
+
+    try (CloseableHttpResponse mockResponse =
+        mock(CloseableHttpResponse.class, Answers.RETURNS_DEEP_STUBS)) {
+      when(clientFactory.getHttpClient()).thenReturn(httpClient);
+      when(httpClient.execute(any(HttpPost.class))).thenReturn(mockResponse);
+      when(mockResponse.getStatusLine().getStatusCode()).thenReturn(404);
+    }
+
+    assertThrows(DatadogResourceManagerException.class, () -> 
testManager.sendHttpEvent(event));
+  }
+
+  @Test
+  public void 
testSendHttpEventsShouldReturnTrueIfDatadogDoesNotThrowAnyError() throws 
IOException {
+    DatadogLogEntry event = 
DatadogLogEntry.newBuilder().withMessage(MESSAGE).build();
+
+    try (CloseableHttpResponse mockResponse =
+        mock(CloseableHttpResponse.class, Answers.RETURNS_DEEP_STUBS)) {
+      when(clientFactory.getHttpClient()).thenReturn(httpClient);
+      when(httpClient.execute(any(HttpPost.class))).thenReturn(mockResponse);
+      when(mockResponse.getStatusLine().getStatusCode()).thenReturn(202);
+    }
+
+    assertThat(testManager.sendHttpEvents(java.util.Arrays.asList(event, 
event))).isTrue();
+    verify(httpClient).execute(any(HttpPost.class));
+  }
+
+  @Test
+  public void 
testGetEventsShouldThrowErrorWhenServiceClientFailsToExecuteRequest() {
+    doThrow(RuntimeException.class)
+        .when(serviceClient)
+        .retrieveRecordedRequests(any(HttpRequest.class));
+
+    assertThrows(RuntimeException.class, () -> testManager.getEntries());
+  }
+
+  @Test
+  public void testGetEventsShouldReturnTrueIfDatadogDoesNotThrowAnyError() {
+    String httpRequestBody =
+        "[{\n"
+            + "\"message\": \"message\",\n"
+            + "\"hostname\": \"hostname\",\n"
+            + "\"ddsource\": \"ddsource\"\n"
+            + "}]";
+
+    HttpRequest mockHttpRequest = mock(HttpRequest.class);
+    when(mockHttpRequest.getBodyAsString()).thenReturn(httpRequestBody);
+
+    when(serviceClient.retrieveRecordedRequests(any(HttpRequest.class)))
+        .thenReturn(new HttpRequest[] {mockHttpRequest});
+
+    DatadogLogEntry datadogEvent =
+        DatadogLogEntry.newBuilder()
+            .withMessage("message")
+            .withHostname("hostname")
+            .withSource("ddsource")
+            .build();
+
+    assertThat(testManager.getEntries())
+        .containsExactlyElementsIn(Collections.singletonList(datadogEvent));
+  }
+}
diff --git 
a/it/datadog/src/test/java/org/apache/beam/it/datadog/DatadogResourceManagerUtilsTest.java
 
b/it/datadog/src/test/java/org/apache/beam/it/datadog/DatadogResourceManagerUtilsTest.java
new file mode 100644
index 00000000000..2473e18fa53
--- /dev/null
+++ 
b/it/datadog/src/test/java/org/apache/beam/it/datadog/DatadogResourceManagerUtilsTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.it.datadog;
+
+import static com.google.common.truth.Truth.assertThat;
+import static 
org.apache.beam.it.datadog.DatadogResourceManagerUtils.datadogEntryToMap;
+import static 
org.apache.beam.it.datadog.DatadogResourceManagerUtils.generateApiKey;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link DatadogResourceManagerUtils}. */
+@RunWith(JUnit4.class)
+public class DatadogResourceManagerUtilsTest {
+
+  @Test
+  public void testDatadogLogEntryToMapWithValuesSet() {
+    DatadogLogEntry entry =
+        
DatadogLogEntry.newBuilder().withMessage("myEvent").withSource("mySource").build();
+
+    Map<String, Object> expected = new HashMap<>();
+    expected.put("message", "myEvent");
+    expected.put("ddsource", "mySource");
+
+    Map<String, Object> actual = datadogEntryToMap(entry);
+    assertThat(actual).containsExactlyEntriesIn(expected);
+  }
+
+  @Test
+  public void testGenerateApiKeyMeetsRequirements() {
+    for (int i = 0; i < 10000; i++) {
+      String password = generateApiKey();
+      int lower = 0;
+      int upper = 0;
+
+      for (int j = 0; j < password.length(); j++) {
+        char c = password.charAt(j);
+        if (Character.isLowerCase(c)) {
+          lower++;
+        } else if (Character.isUpperCase(c)) {
+          upper++;
+        }
+      }
+
+      assertThat(lower).isAtLeast(1);
+      assertThat(upper).isAtLeast(1);
+    }
+  }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index add4a9ad216..a37c57d043c 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -106,6 +106,7 @@ include(":playground:kafka-emulator")
 include(":it:cassandra")
 include(":it:common")
 include(":it:conditions")
+include(":it:datadog")
 include(":it:elasticsearch")
 include(":it:google-cloud-platform")
 include(":it:jdbc")

Reply via email to