This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 84b467b45a5 Add Datadog IO (#37319)
84b467b45a5 is described below
commit 84b467b45a532f9d75c372248ec8fc5951a7358d
Author: Derrick Williams <[email protected]>
AuthorDate: Wed Jan 21 11:56:59 2026 -0500
Add Datadog IO (#37319)
* porting and some improvements
* fix some publisher issues
* checkstyle, import, etc changes
* add changes info on datadog
* updated formatting and correct issue link
* fix format issue
* fix comments
* add precommit for datadog
* fix nullable annotation
---
.github/workflows/README.md | 1 +
.../beam_PreCommit_Java_Datadog_IO_Direct.yml | 120 +++++
CHANGES.md | 10 +-
build.gradle.kts | 1 +
sdks/java/io/datadog/build.gradle | 48 ++
.../apache/beam/sdk/io/datadog/DatadogEvent.java | 97 ++++
.../beam/sdk/io/datadog/DatadogEventCoder.java | 94 ++++
.../beam/sdk/io/datadog/DatadogEventPublisher.java | 330 ++++++++++++
.../sdk/io/datadog/DatadogEventSerializer.java | 45 ++
.../beam/sdk/io/datadog/DatadogEventWriter.java | 521 +++++++++++++++++++
.../org/apache/beam/sdk/io/datadog/DatadogIO.java | 236 +++++++++
.../beam/sdk/io/datadog/DatadogWriteError.java | 77 +++
.../sdk/io/datadog/DatadogWriteErrorCoder.java | 88 ++++
.../apache/beam/sdk/io/datadog/package-info.java | 28 +
.../beam/sdk/io/datadog/DatadogEventCoderTest.java | 65 +++
.../sdk/io/datadog/DatadogEventPublisherTest.java | 176 +++++++
.../sdk/io/datadog/DatadogEventSerializerTest.java | 95 ++++
.../beam/sdk/io/datadog/DatadogEventTest.java | 73 +++
.../sdk/io/datadog/DatadogEventWriterTest.java | 566 +++++++++++++++++++++
.../apache/beam/sdk/io/datadog/DatadogIOTest.java | 172 +++++++
.../sdk/io/datadog/DatadogWriteErrorCoderTest.java | 61 +++
.../beam/sdk/io/datadog/DatadogWriteErrorTest.java | 66 +++
settings.gradle.kts | 1 +
23 files changed, 2966 insertions(+), 5 deletions(-)
diff --git a/.github/workflows/README.md b/.github/workflows/README.md
index 283be9c2b1f..376e3d0af54 100644
--- a/.github/workflows/README.md
+++ b/.github/workflows/README.md
@@ -235,6 +235,7 @@ PreCommit Jobs run in a schedule and also get triggered in
a PR if relevant sour
| [ PreCommit Java Cdap IO Direct
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Cdap_IO_Direct.yml)
| N/A |`Run Java_Cdap_IO_Direct PreCommit`|
[](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Cdap_IO_Direct.yml?query=event%3Aschedule)
|
| [ PreCommit Java Clickhouse IO Direct
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Clickhouse_IO_Direct.yml)
| N/A |`Run Java_Clickhouse_IO_Direct PreCommit`|
[](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Clickhouse_IO_Direct.yml?query=event%3Aschedule)
|
| [ PreCommit Java Csv IO Direct
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Csv_IO_Direct.yml)
| N/A |`Run Java_Csv_IO_Direct PreCommit`|
[](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Csv_IO_Direct.yml?query=event%3Aschedule)
|
+| [ PreCommit Java Datadog IO Direct
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Datadog_IO_Direct.yml)
| N/A |`Run Java_Datadog_IO_Direct PreCommit`|
[](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Datadog_IO_Direct.yml?query=event%3Aschedule)
|
| [ PreCommit Java Debezium IO Direct
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Debezium_IO_Direct.yml)
| N/A |`Run Java_Debezium_IO_Direct PreCommit`|
[](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Debezium_IO_Direct.yml?query=event%3Aschedule)
|
| [ PreCommit Java ElasticSearch IO Direct
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_ElasticSearch_IO_Direct.yml)
| N/A |`Run Java_ElasticSearch_IO_Direct PreCommit`|
[](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_ElasticSearch_IO_Direct.yml?query
[...]
| [ PreCommit Java Examples Dataflow
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Examples_Dataflow.yml)
| N/A |`Run Java_Examples_Dataflow PreCommit`|
[](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Examples_Dataflow.yml?query=event%3Aschedule)
|
diff --git a/.github/workflows/beam_PreCommit_Java_Datadog_IO_Direct.yml
b/.github/workflows/beam_PreCommit_Java_Datadog_IO_Direct.yml
new file mode 100644
index 00000000000..08bebf31f6b
--- /dev/null
+++ b/.github/workflows/beam_PreCommit_Java_Datadog_IO_Direct.yml
@@ -0,0 +1,120 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: PreCommit Java Datadog IO Direct
+
+on:
+ push:
+ tags: ['v*']
+ branches: ['master', 'release-*']
+ paths:
+ - "sdks/java/io/datadog/**"
+ - ".github/workflows/beam_PreCommit_Java_Datadog_IO_Direct.yml"
+ pull_request_target:
+ branches: ['master', 'release-*']
+ paths:
+ - "sdks/java/io/datadog/**"
+ - 'release/trigger_all_tests.json'
+ - '.github/trigger_files/beam_PreCommit_Java_Datadog_IO_Direct.json'
+ issue_comment:
+ types: [created]
+ schedule:
+ - cron: '15 1/6 * * *'
+ workflow_dispatch:
+
+#Setting explicit permissions for the action to avoid the default permissions
which are `write-all` in case of pull_request_target event
+permissions:
+ actions: write
+ pull-requests: write
+ checks: write
+ contents: read
+ deployments: read
+ id-token: none
+ issues: write
+ discussions: read
+ packages: read
+ pages: read
+ repository-projects: read
+ security-events: read
+ statuses: read
+
+# This allows a subsequently queued workflow run to interrupt previous runs
+concurrency:
+ group: '${{ github.workflow }} @ ${{ github.event.issue.number ||
github.event.pull_request.head.label || github.sha || github.head_ref ||
github.ref }}-${{ github.event.schedule || github.event.comment.id ||
github.event.sender.login }}'
+ cancel-in-progress: true
+
+env:
+ DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
+ GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
+ GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
+
+jobs:
+ beam_PreCommit_Java_Datadog_IO_Direct:
+ name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
+ strategy:
+ matrix:
+ job_name: ["beam_PreCommit_Java_Datadog_IO_Direct"]
+ job_phrase: ["Run Java_Datadog_IO_Direct PreCommit"]
+ timeout-minutes: 60
+ if: |
+ github.event_name == 'push' ||
+ github.event_name == 'pull_request_target' ||
+ (github.event_name == 'schedule' && github.repository == 'apache/beam')
||
+ github.event_name == 'workflow_dispatch' ||
+ github.event.comment.body == 'Run Java_Datadog_IO_Direct PreCommit'
+ runs-on: [self-hosted, ubuntu-20.04, main]
+ steps:
+ - uses: actions/checkout@v4
+ - name: Setup repository
+ uses: ./.github/actions/setup-action
+ with:
+ comment_phrase: ${{ matrix.job_phrase }}
+ github_token: ${{ secrets.GITHUB_TOKEN }}
+ github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
+ - name: Setup environment
+ uses: ./.github/actions/setup-environment-action
+ - name: run Datadog IO build script
+ uses: ./.github/actions/gradle-command-self-hosted-action
+ with:
+ gradle-command: :sdks:java:io:datadog:build
+ arguments: |
+ -PdisableSpotlessCheck=true \
+ -PdisableCheckStyle=true \
+ - name: Archive JUnit Test Results
+ uses: actions/upload-artifact@v4
+ if: ${{ !success() }}
+ with:
+ name: JUnit Test Results
+ path: "**/build/reports/tests/"
+ - name: Publish JUnit Test Results
+ uses: EnricoMi/publish-unit-test-result-action@v2
+ if: always()
+ with:
+ commit: '${{ env.prsha || env.GITHUB_SHA }}'
+ comment_mode: ${{ github.event_name == 'issue_comment' && 'always'
|| 'off' }}
+ files: '**/build/test-results/**/*.xml'
+ large_files: true
+ - name: Archive SpotBugs Results
+ uses: actions/upload-artifact@v4
+ if: always()
+ with:
+ name: SpotBugs Results
+ path: '**/build/reports/spotbugs/*.html'
+ - name: Publish SpotBugs Results
+ uses: jwgmeligmeyling/[email protected]
+ if: always()
+ with:
+ name: Publish SpotBugs
+ path: '**/build/reports/spotbugs/*.html'
\ No newline at end of file
diff --git a/CHANGES.md b/CHANGES.md
index 6bc2f938cf3..0b152787b8b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -55,7 +55,6 @@
* ([#X](https://github.com/apache/beam/issues/X)).
-->
-
# [2.72.0] - Unreleased
## Highlights
@@ -65,7 +64,7 @@
## I/Os
-* Support for X source added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
+* Add support for Datadog IO (Java)
([#37318](https://github.com/apache/beam/issues/37318)).
## New Features / Improvements
@@ -113,6 +112,7 @@
## Known Issues
+
# [2.70.0] - 2025-12-16
## Highlights
@@ -196,7 +196,7 @@ Now Beam has full support for Milvus integration including
Milvus enrichment and
## Highlights
-* [Python] Prism runner now enabled by default for most Python pipelines using
the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This
may break some tests, see https://github.com/apache/beam/pull/34612 for details
on how to handle issues.
+* (Python) Prism runner now enabled by default for most Python pipelines using
the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This
may break some tests, see https://github.com/apache/beam/pull/34612 for details
on how to handle issues.
## I/Os
@@ -212,7 +212,7 @@ Now Beam has full support for Milvus integration including
Milvus enrichment and
Beam now supports data enrichment capabilities using SQL databases, with
built-in support for:
- Managed PostgreSQL, MySQL, and Microsoft SQL Server instances on CloudSQL
- Unmanaged SQL database instances not hosted on CloudSQL (e.g., self-hosted
or on-premises databases)
-* [Python] Added the `ReactiveThrottler` and `ThrottlingSignaler` classes to
streamline throttling behavior in DoFns, expose throttling mechanisms for users
([#35984](https://github.com/apache/beam/pull/35984))
+* (Python) Added the `ReactiveThrottler` and `ThrottlingSignaler` classes to
streamline throttling behavior in DoFns, expose throttling mechanisms for users
([#35984](https://github.com/apache/beam/pull/35984))
* Added a pipeline option to specify the processing timeout for a single
element by any PTransform (Java/Python/Go)
([#35174](https://github.com/apache/beam/issues/35174)).
- When specified, the SDK harness automatically restarts if an element takes
too long to process. Beam runner may then retry processing of the same work
item.
- Use the `--element_processing_timeout_minutes` option to reduce the chance
of having stalled pipelines due to unexpected cases of slow processing, where
slowness might not happen again if processing of the same element is retried.
@@ -2351,4 +2351,4 @@ Schema Options, it will be removed in version `2.23.0`.
([BEAM-9704](https://iss
## Highlights
-- For versions 2.19.0 and older release notes are available on [Apache Beam
Blog](https://beam.apache.org/blog/).
+- For versions 2.19.0 and older release notes are available on [Apache Beam
Blog](https://beam.apache.org/blog/).
\ No newline at end of file
diff --git a/build.gradle.kts b/build.gradle.kts
index 342fe1ee32f..3ae49afa390 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -344,6 +344,7 @@ tasks.register("javaioPreCommit") {
dependsOn(":sdks:java:io:csv:build")
dependsOn(":sdks:java:io:cdap:build")
dependsOn(":sdks:java:io:clickhouse:build")
+ dependsOn(":sdks:java:io:datadog:build")
dependsOn(":sdks:java:io:debezium:expansion-service:build")
dependsOn(":sdks:java:io:debezium:build")
dependsOn(":sdks:java:io:elasticsearch:build")
diff --git a/sdks/java/io/datadog/build.gradle
b/sdks/java/io/datadog/build.gradle
new file mode 100644
index 00000000000..785d656cead
--- /dev/null
+++ b/sdks/java/io/datadog/build.gradle
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+ automaticModuleName: 'org.apache.beam.sdk.io.datadog'
+)
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Datadog"
+ext.summary = "IO to read and write to Datadog."
+
+dependencies {
+ implementation
enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
+ implementation project(path: ":sdks:java:core", configuration: "shadow")
+ implementation library.java.vendored_guava_32_1_2_jre
+ implementation library.java.joda_time
+ implementation library.java.slf4j_api
+ implementation library.java.google_http_client
+ implementation library.java.google_code_gson
+ implementation library.java.auto_value_annotations
+ testImplementation project(path: ":sdks:java:core", configuration:
"shadowTest")
+ testImplementation library.java.jupiter_api
+ testRuntimeOnly library.java.jupiter_engine
+ testImplementation library.java.jupiter_params
+ testImplementation library.java.truth
+ testRuntimeOnly project(path: ":runners:direct-java", configuration:
"shadow")
+ testImplementation project(path: ":sdks:java:io:common")
+ testImplementation group: 'org.mock-server', name:
'mockserver-client-java', version: '5.10.0'
+ testImplementation group: 'org.mock-server', name:
'mockserver-junit-rule', version: '5.10.0'
+ implementation library.java.google_http_client_apache_v2
+ implementation library.java.http_client
+ implementation library.java.http_core
+}
diff --git
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEvent.java
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEvent.java
new file mode 100644
index 00000000000..80334b5e466
--- /dev/null
+++
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEvent.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** A class for Datadog events. */
+@AutoValue
+public abstract class DatadogEvent {
+
+ public static Builder newBuilder() {
+ return new AutoValue_DatadogEvent.Builder();
+ }
+
+ public abstract @Nullable String ddsource();
+
+ public abstract @Nullable String ddtags();
+
+ public abstract @Nullable String hostname();
+
+ public abstract @Nullable String service();
+
+ public abstract @Nullable String message();
+
+ /** A builder class for creating {@link DatadogEvent} objects. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ abstract Builder setDdsource(String source);
+
+ abstract Builder setDdtags(String tags);
+
+ abstract Builder setHostname(String hostname);
+
+ abstract Builder setService(String service);
+
+ abstract Builder setMessage(String message);
+
+ abstract String message();
+
+ abstract DatadogEvent autoBuild();
+
+ public Builder withSource(String source) {
+ checkNotNull(source, "withSource(source) called with null input.");
+
+ return setDdsource(source);
+ }
+
+ public Builder withTags(String tags) {
+ checkNotNull(tags, "withTags(tags) called with null input.");
+
+ return setDdtags(tags);
+ }
+
+ public Builder withHostname(String hostname) {
+ checkNotNull(hostname, "withHostname(hostname) called with null input.");
+
+ return setHostname(hostname);
+ }
+
+ public Builder withService(String service) {
+ checkNotNull(service, "withService(service) called with null input.");
+
+ return setService(service);
+ }
+
+ public Builder withMessage(String message) {
+ checkNotNull(message, "withMessage(message) called with null input.");
+
+ return setMessage(message);
+ }
+
+ public DatadogEvent build() {
+ checkNotNull(message(), "Message is required.");
+
+ return autoBuild();
+ }
+ }
+}
diff --git
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventCoder.java
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventCoder.java
new file mode 100644
index 00000000000..4e5de996ef5
--- /dev/null
+++
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventCoder.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** A {@link org.apache.beam.sdk.coders.Coder} for {@link DatadogEvent}
objects. */
+public class DatadogEventCoder extends AtomicCoder<DatadogEvent> {
+
+ private static final DatadogEventCoder DATADOG_EVENT_CODER = new
DatadogEventCoder();
+
+ private static final TypeDescriptor<DatadogEvent> TYPE_DESCRIPTOR =
+ new TypeDescriptor<DatadogEvent>() {};
+ private static final StringUtf8Coder STRING_UTF_8_CODER =
StringUtf8Coder.of();
+ private static final NullableCoder<String> STRING_NULLABLE_CODER =
+ NullableCoder.of(STRING_UTF_8_CODER);
+
+ public static DatadogEventCoder of() {
+ return DATADOG_EVENT_CODER;
+ }
+
+ @Override
+ public void encode(DatadogEvent value, OutputStream out) throws IOException {
+ STRING_NULLABLE_CODER.encode(value.ddsource(), out);
+ STRING_NULLABLE_CODER.encode(value.ddtags(), out);
+ STRING_NULLABLE_CODER.encode(value.hostname(), out);
+ STRING_NULLABLE_CODER.encode(value.service(), out);
+ STRING_NULLABLE_CODER.encode(value.message(), out);
+ }
+
+ @Override
+ public DatadogEvent decode(InputStream in) throws IOException {
+ DatadogEvent.Builder builder = DatadogEvent.newBuilder();
+
+ String source = STRING_NULLABLE_CODER.decode(in);
+ if (source != null) {
+ builder.withSource(source);
+ }
+
+ String tags = STRING_NULLABLE_CODER.decode(in);
+ if (tags != null) {
+ builder.withTags(tags);
+ }
+
+ String hostname = STRING_NULLABLE_CODER.decode(in);
+ if (hostname != null) {
+ builder.withHostname(hostname);
+ }
+
+ String service = STRING_NULLABLE_CODER.decode(in);
+ if (service != null) {
+ builder.withService(service);
+ }
+
+ String message = STRING_NULLABLE_CODER.decode(in);
+ if (message != null) {
+ builder.withMessage(message);
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public TypeDescriptor<DatadogEvent> getEncodedTypeDescriptor() {
+ return TYPE_DESCRIPTOR;
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ throw new NonDeterministicException(
+ this, "DatadogEvent can hold arbitrary instances, which may be
non-deterministic.");
+ }
+}
diff --git
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisher.java
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisher.java
new file mode 100644
index 00000000000..00a106b2ded
--- /dev/null
+++
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisher.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.client.http.ByteArrayContent;
+import com.google.api.client.http.GZipEncoding;
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpBackOffIOExceptionHandler;
+import com.google.api.client.http.HttpContent;
+import com.google.api.client.http.HttpMediaType;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestFactory;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
+import com.google.api.client.http.apache.v2.ApacheHttpTransport;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.ExponentialBackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+import java.util.Set;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.apache.http.client.config.CookieSpecs;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link DatadogEventPublisher} is a utility class that helps write {@link
DatadogEvent}s to a
+ * Datadog Logs API endpoint.
+ */
+@AutoValue
+public abstract class DatadogEventPublisher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DatadogEventPublisher.class);
+
+ private static final int DEFAULT_MAX_CONNECTIONS = 1;
+
+ @VisibleForTesting protected static final String DD_URL_PATH = "api/v2/logs";
+
+ private static final String DD_API_KEY_HEADER = "dd-api-key";
+
+ private static final String DD_ORIGIN_HEADER = "dd-evp-origin";
+ private static final String DD_ORIGIN_DATAFLOW = "dataflow";
+
+ private static final HttpMediaType MEDIA_TYPE =
+ new HttpMediaType("application/json;charset=utf-8");
+
+ private static final String CONTENT_TYPE =
+ Joiner.on('/').join(MEDIA_TYPE.getType(), MEDIA_TYPE.getSubType());
+
+ private static final String HTTPS_PROTOCOL_PREFIX = "https";
+
+ public static Builder newBuilder() {
+ return new AutoValue_DatadogEventPublisher.Builder()
+
.withMaxElapsedMillis(ExponentialBackOff.DEFAULT_MAX_ELAPSED_TIME_MILLIS);
+ }
+
+ abstract ApacheHttpTransport transport();
+
+ abstract HttpRequestFactory requestFactory();
+
+ abstract GenericUrl genericUrl();
+
+ abstract String apiKey();
+
+ abstract Integer maxElapsedMillis();
+
+ /**
+ * Executes a POST for the list of {@link DatadogEvent} objects into
Datadog's Logs API.
+ *
+ * @param events List of {@link DatadogEvent}s
+ * @return {@link HttpResponse} for the POST.
+ */
+ public HttpResponse execute(List<DatadogEvent> events) throws IOException {
+
+ HttpContent content = getContent(events);
+ HttpRequest request = requestFactory().buildPostRequest(genericUrl(),
content);
+
+ request.setEncoding(new GZipEncoding());
+ request.setUnsuccessfulResponseHandler(
+ new HttpSendLogsUnsuccessfulResponseHandler(getConfiguredBackOff()));
+ request.setIOExceptionHandler(new
HttpBackOffIOExceptionHandler(getConfiguredBackOff()));
+
+ setHeaders(request, apiKey());
+
+ return request.execute();
+ }
+
+ /**
+ * Same as {@link DatadogEventPublisher#execute(List)} but with a single
{@link DatadogEvent}.
+ *
+ * @param event {@link DatadogEvent} object.
+ */
+ public HttpResponse execute(DatadogEvent event) throws IOException {
+ return this.execute(ImmutableList.of(event));
+ }
+
+ /**
+ * Return an {@link ExponentialBackOff} with the right settings.
+ *
+ * @return {@link ExponentialBackOff} object.
+ */
+ @VisibleForTesting
+ protected ExponentialBackOff getConfiguredBackOff() {
+ return new
ExponentialBackOff.Builder().setMaxElapsedTimeMillis(maxElapsedMillis()).build();
+ }
+
+ /** Shutdown connection manager and releases all resources. */
+ public void close() throws IOException {
+ if (transport() != null) {
+ LOG.info("Closing publisher transport.");
+ transport().shutdown();
+ }
+ }
+
+ /**
+ * Utility method to set http headers into the {@link HttpRequest}.
+ *
+ * @param request {@link HttpRequest} object to add headers to.
+ * @param apiKey Datadog's Logs API key.
+ */
+ private void setHeaders(HttpRequest request, String apiKey) {
+ request.getHeaders().set(DD_API_KEY_HEADER, apiKey);
+ request.getHeaders().set(DD_ORIGIN_HEADER, DD_ORIGIN_DATAFLOW);
+ request.getHeaders().setContentEncoding("gzip");
+ }
+
+ /**
+ * Utility method to marshall a list of {@link DatadogEvent}s into an {@link
HttpContent} object
+ * that can be used to create an {@link HttpRequest}.
+ *
+ * @param events List of {@link DatadogEvent}s
+ * @return {@link HttpContent} that can be used to create an {@link
HttpRequest}.
+ */
+ @VisibleForTesting
+ protected HttpContent getContent(List<DatadogEvent> events) {
+ String payload = DatadogEventSerializer.getPayloadString(events);
+ LOG.debug("Payload content: {}", payload);
+ return ByteArrayContent.fromString(CONTENT_TYPE, payload);
+ }
+
+ static class HttpSendLogsUnsuccessfulResponseHandler implements
HttpUnsuccessfulResponseHandler {
+ /*
+ See: https://docs.datadoghq.com/api/latest/logs/#send-logs
+ 408: Request Timeout, request should be retried after some time
+ 429: Too Many Requests, request should be retried after some time
+ */
+ private static final Set<Integer> RETRYABLE_4XX_CODES =
ImmutableSet.of(408, 429);
+
+ private final Sleeper sleeper = Sleeper.DEFAULT;
+ private final BackOff backOff;
+
+ HttpSendLogsUnsuccessfulResponseHandler(BackOff backOff) {
+ this.backOff = Preconditions.checkNotNull(backOff);
+ }
+
+ @Override
+ public boolean handleResponse(HttpRequest req, HttpResponse res, boolean
supportsRetry)
+ throws IOException {
+ if (!supportsRetry) {
+ return false;
+ }
+
+ boolean is5xxStatusCode = res.getStatusCode() / 100 == 5;
+ boolean isRetryable4xxStatusCode =
RETRYABLE_4XX_CODES.contains(res.getStatusCode());
+ if (is5xxStatusCode || isRetryable4xxStatusCode) {
+ try {
+ return BackOffUtils.next(sleeper, backOff);
+ } catch (InterruptedException exception) {
+ // Mark thread as interrupted since we cannot throw
InterruptedException here.
+ Thread.currentThread().interrupt();
+ }
+ }
+ return false;
+ }
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+
+ abstract Builder setTransport(ApacheHttpTransport transport);
+
+ abstract ApacheHttpTransport transport();
+
+ abstract Builder setRequestFactory(HttpRequestFactory requestFactory);
+
+ abstract HttpRequestFactory requestFactory();
+
+ abstract Builder setGenericUrl(GenericUrl genericUrl);
+
+ abstract GenericUrl genericUrl();
+
+ abstract Builder setApiKey(String apiKey);
+
+ abstract String apiKey();
+
+ abstract Builder setMaxElapsedMillis(Integer maxElapsedMillis);
+
+ abstract Integer maxElapsedMillis();
+
+ abstract DatadogEventPublisher autoBuild();
+
+ /**
+ * Method to set the Datadog Logs API URL.
+ *
+ * @param url Logs API URL
+ * @return {@link Builder}
+ */
+ public Builder withUrl(String url) throws UnsupportedEncodingException {
+ checkNotNull(url, "withUrl(url) called with null input.");
+ return setGenericUrl(getGenericUrl(url));
+ }
+
+ /**
+ * Method to set the Datadog Logs API key.
+ *
+ * @param apiKey Logs API key.
+ * @return {@link Builder}
+ */
+ public Builder withApiKey(String apiKey) {
+ checkNotNull(apiKey, "withApiKey(apiKey) called with null input.");
+ return setApiKey(apiKey);
+ }
+
+ /**
+ * Method to max timeout for {@link ExponentialBackOff}. Otherwise uses
the default setting for
+ * {@link ExponentialBackOff}.
+ *
+ * @param maxElapsedMillis max elapsed time in milliseconds for timeout.
+ * @return {@link Builder}
+ */
+ public Builder withMaxElapsedMillis(Integer maxElapsedMillis) {
+ checkNotNull(
+ maxElapsedMillis, "withMaxElapsedMillis(maxElapsedMillis) called
with null input.");
+ return setMaxElapsedMillis(maxElapsedMillis);
+ }
+
+ /**
+ * Validates and builds a {@link DatadogEventPublisher} object.
+ *
+ * @return {@link DatadogEventPublisher}
+ */
+ public DatadogEventPublisher build() throws NoSuchAlgorithmException,
KeyManagementException {
+
+ checkNotNull(apiKey(), "API Key needs to be specified via
withApiKey(apiKey).");
+ checkNotNull(genericUrl(), "URL needs to be specified via
withUrl(url).");
+
+ CloseableHttpClient httpClient = getHttpClient(DEFAULT_MAX_CONNECTIONS);
+
+ setTransport(new ApacheHttpTransport(httpClient));
+ setRequestFactory(transport().createRequestFactory());
+
+ return autoBuild();
+ }
+
+ /**
+ * Utility method to convert a baseUrl into a {@link GenericUrl}.
+ *
+ * @param baseUrl url pointing to the Logs API endpoint.
+ * @return {@link GenericUrl}
+ */
+ private GenericUrl getGenericUrl(String baseUrl) {
+ String url = Joiner.on('/').join(baseUrl, DD_URL_PATH);
+
+ return new GenericUrl(url);
+ }
+
+ /**
+ * Utility method to create a {@link CloseableHttpClient} to make http
POSTs against Datadog's
+ * Logs API.
+ */
+ private CloseableHttpClient getHttpClient(int maxConnections)
+ throws NoSuchAlgorithmException, KeyManagementException {
+
+ HttpClientBuilder builder =
ApacheHttpTransport.newDefaultHttpClientBuilder();
+
+ if (genericUrl().getScheme().equalsIgnoreCase(HTTPS_PROTOCOL_PREFIX)) {
+ LOG.info("SSL connection requested");
+
+ HostnameVerifier hostnameVerifier = new DefaultHostnameVerifier();
+
+ SSLContext sslContext = SSLContextBuilder.create().build();
+
+ SSLConnectionSocketFactory connectionSocketFactory =
+ new SSLConnectionSocketFactory(sslContext, hostnameVerifier);
+ builder.setSSLSocketFactory(connectionSocketFactory);
+ }
+
+ builder.setMaxConnTotal(maxConnections);
+ builder.setDefaultRequestConfig(
+ RequestConfig.custom().setCookieSpec(CookieSpecs.STANDARD).build());
+
+ return builder.build();
+ }
+ }
+}
diff --git
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializer.java
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializer.java
new file mode 100644
index 00000000000..1a388682729
--- /dev/null
+++
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class DatadogEventSerializer {
+ private static final Gson GSON =
+ new GsonBuilder().setFieldNamingStrategy(f ->
f.getName().toLowerCase()).create();
+
+ private DatadogEventSerializer() {}
+
+ /** Utility method to get payload string from a list of {@link
DatadogEvent}s. */
+ public static String getPayloadString(List<DatadogEvent> events) {
+ return GSON.toJson(events);
+ }
+
+ /** Utility method to get payload string from a {@link DatadogEvent}. */
+ public static String getPayloadString(DatadogEvent event) {
+ return GSON.toJson(event);
+ }
+
+ /** Utility method to get payload size from a string. */
+ public static long getPayloadSize(String payload) {
+ return payload.getBytes(StandardCharsets.UTF_8).length;
+ }
+}
diff --git
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventWriter.java
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventWriter.java
new file mode 100644
index 00000000000..6de3a1b86e2
--- /dev/null
+++
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventWriter.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpResponseException;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.InetAddresses;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.InternetDomainName;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link DoFn} to write {@link DatadogEvent}s to Datadog's Logs API. */
+@AutoValue
+public abstract class DatadogEventWriter
+ extends DoFn<KV<Integer, DatadogEvent>, DatadogWriteError> {
+
+ private static final Integer MIN_BATCH_COUNT = 10;
+ private static final Integer DEFAULT_BATCH_COUNT = 100;
+ private static final Integer MAX_BATCH_COUNT = 1000;
+ private static final Logger LOG =
LoggerFactory.getLogger(DatadogEventWriter.class);
+ private static final long DEFAULT_FLUSH_DELAY = 2;
+ private static final Long MAX_BUFFER_SIZE = 5L * 1000 * 1000; // 5MB
+ private static final Counter INPUT_COUNTER =
+ Metrics.counter(DatadogEventWriter.class, "inbound-events");
+ private static final Counter SUCCESS_WRITES =
+ Metrics.counter(DatadogEventWriter.class, "outbound-successful-events");
+ private static final Counter FAILED_WRITES =
+ Metrics.counter(DatadogEventWriter.class, "outbound-failed-events");
+ private static final Counter INVALID_REQUESTS =
+ Metrics.counter(DatadogEventWriter.class, "http-invalid-requests");
+ private static final Counter SERVER_ERROR_REQUESTS =
+ Metrics.counter(DatadogEventWriter.class, "http-server-error-requests");
+ private static final Counter VALID_REQUESTS =
+ Metrics.counter(DatadogEventWriter.class, "http-valid-requests");
+ private static final Distribution SUCCESSFUL_WRITE_LATENCY_MS =
+ Metrics.distribution(DatadogEventWriter.class,
"successful_write_to_datadog_latency_ms");
+ private static final Distribution UNSUCCESSFUL_WRITE_LATENCY_MS =
+ Metrics.distribution(DatadogEventWriter.class,
"unsuccessful_write_to_datadog_latency_ms");
+ private static final Distribution SUCCESSFUL_WRITE_BATCH_SIZE =
+ Metrics.distribution(DatadogEventWriter.class, "write_to_datadog_batch");
+ private static final Distribution SUCCESSFUL_WRITE_PAYLOAD_SIZE =
+ Metrics.distribution(DatadogEventWriter.class, "write_to_datadog_bytes");
+ private static final String BUFFER_STATE_NAME = "buffer";
+ private static final String COUNT_STATE_NAME = "count";
+ private static final String BUFFER_SIZE_STATE_NAME = "buffer_size";
+ private static final String TIME_ID_NAME = "expiry";
+ private static final Pattern URL_PATTERN =
Pattern.compile("^http(s?)://([^:]+)(:[0-9]+)?$");
+
+ @VisibleForTesting
+ protected static final String INVALID_URL_FORMAT_MESSAGE =
+ "Invalid url format. Url format should match PROTOCOL://HOST[:PORT],
where PORT is optional. "
+ + "Supported Protocols are http and https. eg: http://hostname:8088";
+
+ @StateId(BUFFER_STATE_NAME)
+ private final StateSpec<BagState<DatadogEvent>> buffer = StateSpecs.bag();
+
+ @StateId(COUNT_STATE_NAME)
+ private final StateSpec<ValueState<Long>> count = StateSpecs.value();
+
+ @StateId(BUFFER_SIZE_STATE_NAME)
+ private final StateSpec<ValueState<Long>> bufferSize = StateSpecs.value();
+
+ @TimerId(TIME_ID_NAME)
+ private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ private Integer batchCount;
+ private Long maxBufferSize;
+ @Nullable private transient DatadogEventPublisher publisher;
+
+ DatadogEventWriter() {
+ this.batchCount = DEFAULT_BATCH_COUNT;
+ this.maxBufferSize = MAX_BUFFER_SIZE;
+ this.publisher = null;
+ }
+
+ public static Builder newBuilder() {
+ return newBuilder(MIN_BATCH_COUNT);
+ }
+
+ public static Builder newBuilder(@Nullable Integer minBatchCount) {
+ return new AutoValue_DatadogEventWriter.Builder()
+ .setMinBatchCount(MoreObjects.firstNonNull(minBatchCount,
MIN_BATCH_COUNT));
+ }
+
+ @Nullable
+ abstract String url();
+
+ @Nullable
+ abstract String apiKey();
+
+ @Nullable
+ abstract Integer minBatchCount();
+
+ @Nullable
+ abstract Integer inputBatchCount();
+
+ @Nullable
+ abstract Long maxBufferSize();
+
+ @Setup
+ public void setup() {
+
+ final String url = url();
+ if (url == null) {
+ throw new IllegalArgumentException("url is required for writing
events.");
+ }
+ checkArgument(isValidUrlFormat(url), INVALID_URL_FORMAT_MESSAGE);
+ final String apiKey = apiKey();
+ if (apiKey == null) {
+ throw new IllegalArgumentException("API Key is required for writing
events.");
+ }
+
+ batchCount = MoreObjects.firstNonNull(inputBatchCount(),
DEFAULT_BATCH_COUNT);
+ LOG.info("Batch count set to: {}", batchCount);
+
+ maxBufferSize = MoreObjects.firstNonNull(maxBufferSize(), MAX_BUFFER_SIZE);
+ LOG.info("Max buffer size set to: {}", maxBufferSize);
+
+ checkArgument(
+ batchCount >= MoreObjects.firstNonNull(minBatchCount(),
MIN_BATCH_COUNT),
+ "batchCount must be greater than or equal to %s",
+ minBatchCount());
+ checkArgument(
+ batchCount <= MAX_BATCH_COUNT,
+ "batchCount must be less than or equal to %s",
+ MAX_BATCH_COUNT);
+
+ try {
+ DatadogEventPublisher.Builder builder =
+ DatadogEventPublisher.newBuilder().withUrl(url).withApiKey(apiKey);
+
+ publisher = builder.build();
+ } catch (IOException | NoSuchAlgorithmException | KeyManagementException
e) {
+ LOG.error("Error creating HttpEventPublisher: ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element KV<Integer, DatadogEvent> input,
+ OutputReceiver<DatadogWriteError> receiver,
+ BoundedWindow window,
+ @StateId(BUFFER_STATE_NAME) BagState<DatadogEvent> bufferState,
+ @StateId(COUNT_STATE_NAME) ValueState<Long> countState,
+ @StateId(BUFFER_SIZE_STATE_NAME) ValueState<Long> bufferSizeState,
+ @TimerId(TIME_ID_NAME) Timer timer)
+ throws IOException {
+
+ DatadogEvent event = input.getValue();
+ INPUT_COUNTER.inc();
+
+ String eventPayload = DatadogEventSerializer.getPayloadString(event);
+ long eventPayloadSize =
DatadogEventSerializer.getPayloadSize(eventPayload);
+ if (eventPayloadSize > maxBufferSize) {
+ LOG.error(
+ "Error processing event of size {} due to exceeding max buffer
size", eventPayloadSize);
+ DatadogWriteError error =
DatadogWriteError.newBuilder().withPayload(eventPayload).build();
+ receiver.output(error);
+ return;
+ }
+
+ timer.offset(Duration.standardSeconds(DEFAULT_FLUSH_DELAY)).setRelative();
+
+ long count = MoreObjects.<Long>firstNonNull(countState.read(), 0L);
+ long bufferSize = MoreObjects.<Long>firstNonNull(bufferSizeState.read(),
0L);
+ if (bufferSize + eventPayloadSize > maxBufferSize) {
+ LOG.debug("Flushing batch of {} events of size {} due to max buffer
size", count, bufferSize);
+ flush(receiver, bufferState, countState, bufferSizeState);
+
+ count = 0L;
+ bufferSize = 0L;
+ }
+
+ bufferState.add(event);
+
+ count = count + 1L;
+ countState.write(count);
+
+ bufferSize = bufferSize + eventPayloadSize;
+ bufferSizeState.write(bufferSize);
+
+ if (count >= batchCount) {
+ LOG.debug("Flushing batch of {} events of size {} due to batch count",
count, bufferSize);
+ flush(receiver, bufferState, countState, bufferSizeState);
+ }
+ }
+
+ @OnTimer(TIME_ID_NAME)
+ public void onExpiry(
+ OutputReceiver<DatadogWriteError> receiver,
+ @StateId(BUFFER_STATE_NAME) BagState<DatadogEvent> bufferState,
+ @StateId(COUNT_STATE_NAME) ValueState<Long> countState,
+ @StateId(BUFFER_SIZE_STATE_NAME) ValueState<Long> bufferSizeState)
+ throws IOException {
+
+ long count = MoreObjects.<Long>firstNonNull(countState.read(), 0L);
+ long bufferSize = MoreObjects.<Long>firstNonNull(bufferSizeState.read(),
0L);
+
+ if (count > 0) {
+ LOG.debug("Flushing batch of {} events of size {} due to timer", count,
bufferSize);
+ flush(receiver, bufferState, countState, bufferSizeState);
+ }
+ }
+
+ @Teardown
+ public void tearDown() {
+ if (this.publisher != null) {
+ try {
+ this.publisher.close();
+ LOG.info("Successfully closed HttpEventPublisher");
+
+ } catch (IOException e) {
+ LOG.warn("Received exception while closing HttpEventPublisher: ", e);
+ }
+ }
+ }
+
+ /**
+ * Utility method to flush a batch of events via {@link
DatadogEventPublisher}.
+ *
+ * @param receiver Receiver to write {@link DatadogWriteError}s to
+ */
+ private void flush(
+ OutputReceiver<DatadogWriteError> receiver,
+ @StateId(BUFFER_STATE_NAME) BagState<DatadogEvent> bufferState,
+ @StateId(COUNT_STATE_NAME) ValueState<Long> countState,
+ @StateId(BUFFER_SIZE_STATE_NAME) ValueState<Long> bufferSizeState)
+ throws IOException {
+
+ if (!bufferState.isEmpty().read()) {
+
+ long count = MoreObjects.firstNonNull(countState.read(), 0L);
+ long bufferSize = MoreObjects.firstNonNull(bufferSizeState.read(), 0L);
+ HttpResponse response = null;
+ List<DatadogEvent> events = Lists.newArrayList(bufferState.read());
+ long startTime = System.nanoTime();
+ try {
+ // Important to close this response to avoid connection leak.
+ response = checkNotNull(publisher).execute(events);
+ if (!response.isSuccessStatusCode()) {
+ UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime()
- startTime));
+ FAILED_WRITES.inc(count);
+ int statusCode = response.getStatusCode();
+ if (statusCode >= 400 && statusCode < 500) {
+ INVALID_REQUESTS.inc();
+ } else if (statusCode >= 500 && statusCode < 600) {
+ SERVER_ERROR_REQUESTS.inc();
+ }
+
+ logWriteFailures(
+ count,
+ response.getStatusCode(),
+ response.parseAsString(),
+ response.getStatusMessage());
+ flushWriteFailures(
+ events, response.getStatusMessage(), response.getStatusCode(),
receiver);
+
+ } else {
+ SUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() -
startTime));
+ SUCCESS_WRITES.inc(count);
+ VALID_REQUESTS.inc();
+ SUCCESSFUL_WRITE_BATCH_SIZE.update(count);
+ SUCCESSFUL_WRITE_PAYLOAD_SIZE.update(bufferSize);
+
+ LOG.debug("Successfully wrote {} events", count);
+ }
+
+ } catch (HttpResponseException e) {
+ UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() -
startTime));
+ FAILED_WRITES.inc(count);
+ int statusCode = e.getStatusCode();
+ if (statusCode >= 400 && statusCode < 500) {
+ INVALID_REQUESTS.inc();
+ } else if (statusCode >= 500 && statusCode < 600) {
+ SERVER_ERROR_REQUESTS.inc();
+ }
+
+ logWriteFailures(count, e.getStatusCode(), e.getContent(),
e.getStatusMessage());
+ flushWriteFailures(events, e.getStatusMessage(), e.getStatusCode(),
receiver);
+
+ } catch (IOException ioe) {
+ UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() -
startTime));
+ FAILED_WRITES.inc(count);
+ INVALID_REQUESTS.inc();
+
+ logWriteFailures(count, 0, ioe.getMessage(), null);
+ flushWriteFailures(events, ioe.getMessage(), null, receiver);
+
+ } finally {
+ // States are cleared regardless of write success or failure since we
+ // write failed events to an output PCollection.
+ bufferState.clear();
+ countState.clear();
+ bufferSizeState.clear();
+
+ // We've observed cases where errors at this point can cause the
pipeline to keep retrying
+ // the same events over and over (e.g. from Dataflow Runner's Pub/Sub
implementation). Since
+ // the events have either been published or wrapped for error
handling, we can safely
+ // ignore this error, though there may or may not be a leak of some
type depending on
+ // HttpResponse's implementation. However, any potential leak would
still happen if we let
+ // the exception fall through, so this isn't considered a major issue.
+ try {
+ if (response != null) {
+ response.ignore();
+ }
+ } catch (IOException e) {
+ LOG.warn(
+ "Error ignoring response from Datadog. Messages should still
have published, but there"
+ + " might be a connection leak.",
+ e);
+ }
+ }
+ }
+ }
+
+ /** Utility method to log write failures. */
+ private void logWriteFailures(
+ long count, int statusCode, @Nullable String content, @Nullable String
statusMessage) {
+ LOG.error("Failed to write {} events", count);
+ LOG.error(
+ "Error writing to Datadog. StatusCode: {}, content: {}, StatusMessage:
{}",
+ statusCode,
+ content,
+ statusMessage);
+ }
+
+ /**
+ * Utility method to un-batch and flush failed write events.
+ *
+ * @param events List of {@link DatadogEvent}s to un-batch
+ * @param statusMessage Status message to be added to {@link
DatadogWriteError}
+ * @param statusCode Status code to be added to {@link DatadogWriteError}
+ * @param receiver Receiver to write {@link DatadogWriteError}s to
+ */
+ private void flushWriteFailures(
+ List<DatadogEvent> events,
+ @Nullable String statusMessage,
+ @Nullable Integer statusCode,
+ OutputReceiver<DatadogWriteError> receiver) {
+
+ checkNotNull(events, "DatadogEvents cannot be null.");
+
+ DatadogWriteError.Builder builder = DatadogWriteError.newBuilder();
+
+ if (statusMessage != null) {
+ builder.withStatusMessage(statusMessage);
+ }
+
+ if (statusCode != null) {
+ builder.withStatusCode(statusCode);
+ }
+
+ for (DatadogEvent event : events) {
+ String payload = DatadogEventSerializer.getPayloadString(event);
+ DatadogWriteError error = builder.withPayload(payload).build();
+ receiver.output(error);
+ }
+ }
+
+ /**
+ * Checks whether the Logs API URL matches the format PROTOCOL://HOST[:PORT].
+ *
+ * @param url for Logs API
+ * @return true if the URL is valid
+ */
+ private static boolean isValidUrlFormat(@Nullable String url) {
+ if (url == null) {
+ return false;
+ }
+ Matcher matcher = URL_PATTERN.matcher(url);
+ if (matcher.find()) {
+ String host = matcher.group(2);
+ if (host == null) {
+ return false;
+ }
+ return InetAddresses.isInetAddress(host) ||
InternetDomainName.isValid(host);
+ }
+ return false;
+ }
+
+ /**
+ * Converts Nanoseconds to Milliseconds.
+ *
+ * @param ns time in nanoseconds
+ * @return time in milliseconds
+ */
+ private static long nanosToMillis(long ns) {
+ return Math.round(((double) ns) / 1e6);
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+
+ abstract Builder setUrl(String url);
+
+ abstract String url();
+
+ abstract Builder setApiKey(String apiKey);
+
+ abstract String apiKey();
+
+ abstract Builder setMinBatchCount(Integer minBatchCount);
+
+ abstract Integer minBatchCount();
+
+ abstract Builder setInputBatchCount(@Nullable Integer inputBatchCount);
+
+ abstract Builder setMaxBufferSize(Long maxBufferSize);
+
+ abstract DatadogEventWriter autoBuild();
+
+ /**
+ * Method to set the url for Logs API.
+ *
+ * @param url for Logs API
+ * @return {@link Builder}
+ */
+ public Builder withUrl(String url) {
+ checkArgument(url != null, "withURL(url) called with null input.");
+ checkArgument(isValidUrlFormat(url), INVALID_URL_FORMAT_MESSAGE);
+ return setUrl(url);
+ }
+
+ /**
+ * Method to set the API key for Logs API.
+ *
+ * @param apiKey API key for Logs API
+ * @return {@link Builder}
+ */
+ public Builder withApiKey(String apiKey) {
+ checkArgument(apiKey != null, "withApiKey(apiKey) called with null
input.");
+ return setApiKey(apiKey);
+ }
+
+ /**
+ * Method to set the inputBatchCount.
+ *
+ * @param inputBatchCount for batching post requests.
+ * @return {@link Builder}
+ */
+ public Builder withInputBatchCount(@Nullable Integer inputBatchCount) {
+ if (inputBatchCount != null) {
+ checkArgument(
+ inputBatchCount >= MoreObjects.firstNonNull(minBatchCount(),
MIN_BATCH_COUNT),
+ "inputBatchCount must be greater than or equal to %s",
+ minBatchCount());
+ checkArgument(
+ inputBatchCount <= MAX_BATCH_COUNT,
+ "inputBatchCount must be less than or equal to %s",
+ MAX_BATCH_COUNT);
+ }
+ return setInputBatchCount(inputBatchCount);
+ }
+
+ /**
+ * Method to set the maxBufferSize.
+ *
+ * @param maxBufferSize for batching post requests.
+ * @return {@link Builder}
+ */
+ public Builder withMaxBufferSize(@Nullable Long maxBufferSize) {
+ if (maxBufferSize == null) {
+ return setMaxBufferSize(MAX_BUFFER_SIZE);
+ }
+ return setMaxBufferSize(maxBufferSize);
+ }
+
+ /** Build a new {@link DatadogEventWriter} objects based on the
configuration. */
+ public DatadogEventWriter build() {
+ checkNotNull(url(), "url needs to be provided.");
+ checkNotNull(apiKey(), "apiKey needs to be provided.");
+
+ return autoBuild();
+ }
+ }
+}
diff --git
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogIO.java
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogIO.java
new file mode 100644
index 00000000000..fa8b6befaba
--- /dev/null
+++
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogIO.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link DatadogIO} class provides a {@link PTransform} that allows
writing {@link
+ * DatadogEvent} messages into a Datadog Logs API end point.
+ */
+public class DatadogIO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DatadogIO.class);
+
+ private DatadogIO() {}
+
+ public static Write.Builder writeBuilder() {
+ return writeBuilder(null);
+ }
+
+ public static Write.Builder writeBuilder(@Nullable Integer minBatchCount) {
+ return new
AutoValue_DatadogIO_Write.Builder().setMinBatchCount(minBatchCount);
+ }
+
+ /**
+ * Class {@link Write} provides a {@link PTransform} that allows writing
{@link DatadogEvent}
+ * records into a Datadog Logs API end-point using HTTP POST requests. In
the event of an error, a
+ * {@link PCollection} of {@link DatadogWriteError} records are returned for
further processing or
+ * storing into a deadletter sink.
+ */
+ @AutoValue
+ public abstract static class Write
+ extends PTransform<PCollection<DatadogEvent>,
PCollection<DatadogWriteError>> {
+
+ abstract String url();
+
+ abstract String apiKey();
+
+ @Nullable
+ abstract Integer minBatchCount();
+
+ @Nullable
+ abstract Integer batchCount();
+
+ @Nullable
+ abstract Long maxBufferSize();
+
+ @Nullable
+ abstract Integer parallelism();
+
+ @Override
+ public PCollection<DatadogWriteError> expand(PCollection<DatadogEvent>
input) {
+
+ LOG.info("Configuring DatadogEventWriter.");
+ DatadogEventWriter.Builder builder =
+ DatadogEventWriter.newBuilder(minBatchCount())
+ .withMaxBufferSize(maxBufferSize())
+ .withUrl(url())
+ .withInputBatchCount(batchCount())
+ .withApiKey(apiKey());
+
+ DatadogEventWriter writer = builder.build();
+ LOG.info("DatadogEventWriter configured");
+
+ // Return a PCollection<DatadogWriteError>
+ return input
+ .apply("Create KV pairs", CreateKeys.of(parallelism()))
+ .apply("Write Datadog events", ParDo.of(writer))
+ .setCoder(DatadogWriteErrorCoder.of());
+ }
+
+ /** A builder for creating {@link Write} objects. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ abstract Builder setUrl(String url);
+
+ abstract String url();
+
+ abstract Builder setApiKey(String apiKey);
+
+ abstract String apiKey();
+
+ abstract Builder setMinBatchCount(@Nullable Integer minBatchCount);
+
+ abstract Builder setBatchCount(Integer batchCount);
+
+ abstract Builder setMaxBufferSize(Long maxBufferSize);
+
+ abstract Builder setParallelism(Integer parallelism);
+
+ abstract Write autoBuild();
+
+ /**
+ * Method to set the url for Logs API.
+ *
+ * @param url for Logs API
+ * @return {@link Builder}
+ */
+ public Builder withUrl(String url) {
+ checkArgument(url != null, "withURL(url) called with null input.");
+ return setUrl(url);
+ }
+
+ /**
+ * Method to set the API key for Logs API.
+ *
+ * @param apiKey API key for Logs API
+ * @return {@link Builder}
+ */
+ public Builder withApiKey(String apiKey) {
+ checkArgument(apiKey != null, "withApiKey(apiKey) called with null
input.");
+ return setApiKey(apiKey);
+ }
+
+ /**
+ * Method to set the Batch Count.
+ *
+ * @param batchCount for batching post requests.
+ * @return {@link Builder}
+ */
+ public Builder withBatchCount(Integer batchCount) {
+ checkArgument(batchCount != null, "withBatchCount(batchCount) called
with null input.");
+ return setBatchCount(batchCount);
+ }
+
+ /**
+ * Method to set the Max Buffer Size.
+ *
+ * @param maxBufferSize for batching post requests.
+ * @return {@link Builder}
+ */
+ public Builder withMaxBufferSize(Long maxBufferSize) {
+ checkArgument(
+ maxBufferSize != null, "withMaxBufferSize(maxBufferSize) called
with null input.");
+ return setMaxBufferSize(maxBufferSize);
+ }
+
+ /**
+ * Method to set the parallelism.
+ *
+ * @param parallelism for controlling the number of http client
connections.
+ * @return {@link Builder}
+ */
+ public Builder withParallelism(Integer parallelism) {
+ checkArgument(parallelism != null, "withParallelism(parallelism)
called with null input.");
+ return setParallelism(parallelism);
+ }
+
+ public Write build() {
+ checkNotNull(url(), "Logs API url is required.");
+ checkNotNull(apiKey(), "API key is required.");
+
+ return autoBuild();
+ }
+ }
+
+ private static class CreateKeys
+ extends PTransform<PCollection<DatadogEvent>, PCollection<KV<Integer,
DatadogEvent>>> {
+
+ private static final Integer DEFAULT_PARALLELISM = 1;
+
+ @Nullable private Integer requestedKeys;
+
+ private CreateKeys(@Nullable Integer requestedKeys) {
+ this.requestedKeys = requestedKeys;
+ }
+
+ static CreateKeys of(@Nullable Integer requestedKeys) {
+ return new CreateKeys(requestedKeys);
+ }
+
+ @Override
+ public PCollection<KV<Integer, DatadogEvent>>
expand(PCollection<DatadogEvent> input) {
+
+ return input
+ .apply("Inject Keys", ParDo.of(new
CreateKeysFn(this.requestedKeys)))
+ .setCoder(KvCoder.of(BigEndianIntegerCoder.of(),
DatadogEventCoder.of()));
+ }
+
+ private static class CreateKeysFn extends DoFn<DatadogEvent, KV<Integer,
DatadogEvent>> {
+
+ @Nullable private Integer specifiedParallelism;
+ private Integer calculatedParallelism;
+
+ CreateKeysFn(@Nullable Integer specifiedParallelism) {
+ this.specifiedParallelism = specifiedParallelism;
+ this.calculatedParallelism =
+ MoreObjects.firstNonNull(specifiedParallelism,
DEFAULT_PARALLELISM);
+ LOG.info("Parallelism set to: {}", calculatedParallelism);
+ }
+
+ @Setup
+ public void setup() {
+ // Initialization is now in the constructor to satisfy static
analysis.
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ context.output(
+
KV.of(ThreadLocalRandom.current().nextInt(calculatedParallelism),
context.element()));
+ }
+ }
+ }
+ }
+}
diff --git
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteError.java
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteError.java
new file mode 100644
index 00000000000..977873718c6
--- /dev/null
+++
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteError.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+
+/** A class for capturing errors writing {@link DatadogEvent}s to Datadog's
Logs API. */
+@AutoValue
+public abstract class DatadogWriteError {
+
+ public static Builder newBuilder() {
+ return new AutoValue_DatadogWriteError.Builder();
+ }
+
+ @Nullable
+ public abstract Integer statusCode();
+
+ @Nullable
+ public abstract String statusMessage();
+
+ @Nullable
+ public abstract String payload();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+
+ abstract Builder setStatusCode(Integer statusCode);
+
+ abstract Integer statusCode();
+
+ abstract Builder setStatusMessage(String statusMessage);
+
+ abstract Builder setPayload(String payload);
+
+ abstract DatadogWriteError autoBuild();
+
+ public Builder withStatusCode(Integer statusCode) {
+ checkNotNull(statusCode, "withStatusCode(statusCode) called with null
input.");
+
+ return setStatusCode(statusCode);
+ }
+
+ public Builder withStatusMessage(String statusMessage) {
+ checkNotNull(statusMessage, "withStatusMessage(statusMessage) called
with null input.");
+
+ return setStatusMessage(statusMessage);
+ }
+
+ public Builder withPayload(String payload) {
+ checkNotNull(payload, "withPayload(payload) called with null input.");
+
+ return setPayload(payload);
+ }
+
+ public DatadogWriteError build() {
+ return autoBuild();
+ }
+ }
+}
diff --git
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoder.java
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoder.java
new file mode 100644
index 00000000000..a634c798518
--- /dev/null
+++
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** A {@link org.apache.beam.sdk.coders.Coder} for {@link DatadogWriteError}
objects. */
+public class DatadogWriteErrorCoder extends AtomicCoder<DatadogWriteError> {
+
+ private static final DatadogWriteErrorCoder DATADOG_WRITE_ERROR_CODER =
+ new DatadogWriteErrorCoder();
+
+ private static final TypeDescriptor<DatadogWriteError> TYPE_DESCRIPTOR =
+ new TypeDescriptor<DatadogWriteError>() {};
+ private static final StringUtf8Coder STRING_UTF_8_CODER =
StringUtf8Coder.of();
+ private static final NullableCoder<String> STRING_NULLABLE_CODER =
+ NullableCoder.of(STRING_UTF_8_CODER);
+ private static final NullableCoder<Integer> INTEGER_NULLABLE_CODER =
+ NullableCoder.of(BigEndianIntegerCoder.of());
+
+ public static DatadogWriteErrorCoder of() {
+ return DATADOG_WRITE_ERROR_CODER;
+ }
+
+ @Override
+ public void encode(DatadogWriteError value, OutputStream out) throws
CoderException, IOException {
+ INTEGER_NULLABLE_CODER.encode(value.statusCode(), out);
+ STRING_NULLABLE_CODER.encode(value.statusMessage(), out);
+ STRING_NULLABLE_CODER.encode(value.payload(), out);
+ }
+
+ @Override
+ public DatadogWriteError decode(InputStream in) throws CoderException,
IOException {
+
+ DatadogWriteError.Builder builder = DatadogWriteError.newBuilder();
+
+ Integer statusCode = INTEGER_NULLABLE_CODER.decode(in);
+ if (statusCode != null) {
+ builder.withStatusCode(statusCode);
+ }
+
+ String statusMessage = STRING_NULLABLE_CODER.decode(in);
+ if (statusMessage != null) {
+ builder.withStatusMessage(statusMessage);
+ }
+
+ String payload = STRING_NULLABLE_CODER.decode(in);
+ if (payload != null) {
+ builder.withPayload(payload);
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public TypeDescriptor<DatadogWriteError> getEncodedTypeDescriptor() {
+ return TYPE_DESCRIPTOR;
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ throw new NonDeterministicException(
+ this, "DatadogWriteError can hold arbitrary instances, which may be
non-deterministic.");
+ }
+}
diff --git
a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/package-info.java
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/package-info.java
new file mode 100644
index 00000000000..fbeed9f1a55
--- /dev/null
+++
b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for writing to <a href="https://www.datadoghq.com/">Datadog</a>.
+ *
+ * <p>The {@link org.apache.beam.sdk.io.datadog.DatadogIO} class provides a
{@link
+ * org.apache.beam.sdk.transforms.PTransform} that allows writing data to the
Datadog Logs API.
+ *
+ * <p>For more information on the Datadog Logs API, see the <a
+ * href="https://docs.datadoghq.com/api/latest/logs/">official
documentation</a>.
+ */
+package org.apache.beam.sdk.io.datadog;
diff --git
a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventCoderTest.java
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventCoderTest.java
new file mode 100644
index 00000000000..f1dad0784af
--- /dev/null
+++
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventCoderTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.junit.Test;
+
+/** Unit tests for {@link com.google.cloud.teleport.datadog.DatadogEventCoder}
class. */
+public class DatadogEventCoderTest {
+
+ /**
+ * Test whether {@link DatadogEventCoder} is able to encode/decode a {@link
DatadogEvent}
+ * correctly.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testEncodeDecode() throws IOException {
+
+ String source = "test-source";
+ String tags = "test-tags";
+ String hostname = "test-hostname";
+ String service = "test-service";
+ String message = "test-message";
+
+ DatadogEvent actualEvent =
+ DatadogEvent.newBuilder()
+ .withSource(source)
+ .withTags(tags)
+ .withHostname(hostname)
+ .withService(service)
+ .withMessage(message)
+ .build();
+
+ DatadogEventCoder coder = DatadogEventCoder.of();
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+ coder.encode(actualEvent, bos);
+ try (ByteArrayInputStream bin = new
ByteArrayInputStream(bos.toByteArray())) {
+ DatadogEvent decodedEvent = coder.decode(bin);
+ assertThat(decodedEvent, is(equalTo(actualEvent)));
+ }
+ }
+ }
+}
diff --git
a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisherTest.java
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisherTest.java
new file mode 100644
index 00000000000..17f6e7a6e15
--- /dev/null
+++
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisherTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpContent;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.util.ExponentialBackOff;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+import org.mockserver.configuration.ConfigurationProperties;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.MediaType;
+import org.mockserver.verify.VerificationTimes;
+
+/** Unit tests for {@link DatadogEventPublisher} class. */
+public class DatadogEventPublisherTest {
+
+ private static final String EXPECTED_PATH = "/" +
DatadogEventPublisher.DD_URL_PATH;
+
+ private static final DatadogEvent DATADOG_TEST_EVENT_1 =
+ DatadogEvent.newBuilder()
+ .withSource("test-source-1")
+ .withTags("test-tags-1")
+ .withHostname("test-hostname-1")
+ .withService("test-service-1")
+ .withMessage("test-message-1")
+ .build();
+
+ private static final DatadogEvent DATADOG_TEST_EVENT_2 =
+ DatadogEvent.newBuilder()
+ .withSource("test-source-2")
+ .withTags("test-tags-2")
+ .withHostname("test-hostname-2")
+ .withService("test-service-2")
+ .withMessage("test-message-2")
+ .build();
+
+ private static final List<DatadogEvent> DATADOG_EVENTS =
+ ImmutableList.of(DATADOG_TEST_EVENT_1, DATADOG_TEST_EVENT_2);
+
+ /** Test whether {@link HttpContent} is created from the list of {@link
DatadogEvent}s. */
+ @Test
+ public void contentTest() throws NoSuchAlgorithmException,
KeyManagementException, IOException {
+
+ DatadogEventPublisher publisher =
+ DatadogEventPublisher.newBuilder()
+ .withUrl("http://example.com")
+ .withApiKey("test-api-key")
+ .build();
+
+ String expectedString =
+ "["
+ + "{\"ddsource\":\"test-source-1\",\"ddtags\":\"test-tags-1\","
+ +
"\"hostname\":\"test-hostname-1\",\"service\":\"test-service-1\","
+ + "\"message\":\"test-message-1\"},"
+ + "{\"ddsource\":\"test-source-2\",\"ddtags\":\"test-tags-2\","
+ +
"\"hostname\":\"test-hostname-2\",\"service\":\"test-service-2\","
+ + "\"message\":\"test-message-2\"}"
+ + "]";
+
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+ HttpContent actualContent = publisher.getContent(DATADOG_EVENTS);
+ actualContent.writeTo(bos);
+ String actualString = new String(bos.toByteArray(),
StandardCharsets.UTF_8);
+ assertThat(actualString, is(equalTo(expectedString)));
+ }
+ }
+
+ @Test
+ public void genericURLTest() throws IOException {
+
+ String baseURL = "http://example.com";
+ DatadogEventPublisher.Builder builder =
+
DatadogEventPublisher.newBuilder().withUrl(baseURL).withApiKey("test-api-key");
+
+ assertThat(
+ builder.genericUrl(),
+ is(equalTo(new GenericUrl(Joiner.on('/').join(baseURL,
"api/v2/logs")))));
+ }
+
+ @Test
+ public void configureBackOffDefaultTest()
+ throws NoSuchAlgorithmException, KeyManagementException, IOException {
+
+ DatadogEventPublisher publisherDefaultBackOff =
+ DatadogEventPublisher.newBuilder()
+ .withUrl("http://example.com")
+ .withApiKey("test-api-key")
+ .build();
+
+ assertThat(
+
publisherDefaultBackOff.getConfiguredBackOff().getMaxElapsedTimeMillis(),
+ is(equalTo(ExponentialBackOff.DEFAULT_MAX_ELAPSED_TIME_MILLIS)));
+ }
+
+ @Test
+ public void configureBackOffCustomTest()
+ throws NoSuchAlgorithmException, KeyManagementException, IOException {
+
+ int timeoutInMillis = 600000; // 10 minutes
+ DatadogEventPublisher publisherWithBackOff =
+ DatadogEventPublisher.newBuilder()
+ .withUrl("http://example.com")
+ .withApiKey("test-api-key")
+ .withMaxElapsedMillis(timeoutInMillis)
+ .build();
+
+ assertThat(
+ publisherWithBackOff.getConfiguredBackOff().getMaxElapsedTimeMillis(),
+ is(equalTo(timeoutInMillis)));
+ }
+
+ @Test
+ public void requestHeadersTest() throws Exception {
+ ConfigurationProperties.disableSystemOut(true);
+ try (ClientAndServer mockServer = startClientAndServer()) {
+ mockServer
+ .when(org.mockserver.model.HttpRequest.request(EXPECTED_PATH))
+
.respond(org.mockserver.model.HttpResponse.response().withStatusCode(202));
+
+ DatadogEventPublisher publisher =
+ DatadogEventPublisher.newBuilder()
+ .withUrl(Joiner.on(':').join("http://localhost",
mockServer.getPort()))
+ .withApiKey("test-api-key")
+ .build();
+
+ DatadogEvent event =
+ DatadogEvent.newBuilder()
+ .withSource("test-source-1")
+ .withTags("test-tags-1")
+ .withHostname("test-hostname-1")
+ .withService("test-service-1")
+ .withMessage("test-message-1")
+ .build();
+
+ HttpResponse response = publisher.execute(ImmutableList.of(event));
+ assertThat(response.getStatusCode(), is(equalTo(202)));
+
+ mockServer.verify(
+ org.mockserver.model.HttpRequest.request(EXPECTED_PATH)
+ .withContentType(MediaType.APPLICATION_JSON)
+ .withHeader("dd-api-key", "test-api-key")
+ .withHeader("dd-evp-origin", "dataflow")
+ .withHeader("Accept-Encoding", "gzip"),
+ VerificationTimes.once());
+ }
+ }
+}
diff --git
a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializerTest.java
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializerTest.java
new file mode 100644
index 00000000000..15b127da2f0
--- /dev/null
+++
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializerTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.List;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+public class DatadogEventSerializerTest {
+
+ private static final DatadogEvent DATADOG_TEST_EVENT_1 =
+ DatadogEvent.newBuilder()
+ .withSource("test-source-1")
+ .withTags("test-tags-1")
+ .withHostname("test-hostname-1")
+ .withService("test-service-1")
+ .withMessage("test-message-1")
+ .build();
+
+ private static final DatadogEvent DATADOG_TEST_EVENT_2 =
+ DatadogEvent.newBuilder()
+ .withSource("test-source-2")
+ .withTags("test-tags-2")
+ .withHostname("test-hostname-2")
+ .withService("test-service-2")
+ .withMessage("test-message-2")
+ .build();
+
+ private static final List<DatadogEvent> DATADOG_EVENTS =
+ ImmutableList.of(DATADOG_TEST_EVENT_1, DATADOG_TEST_EVENT_2);
+
+ /** Test whether payload is stringified as expected. */
+ @Test
+ public void stringPayloadTest_list() {
+ String actual = DatadogEventSerializer.getPayloadString(DATADOG_EVENTS);
+
+ String expected =
+ "["
+ + "{\"ddsource\":\"test-source-1\",\"ddtags\":\"test-tags-1\","
+ +
"\"hostname\":\"test-hostname-1\",\"service\":\"test-service-1\","
+ + "\"message\":\"test-message-1\"},"
+ + "{\"ddsource\":\"test-source-2\",\"ddtags\":\"test-tags-2\","
+ +
"\"hostname\":\"test-hostname-2\",\"service\":\"test-service-2\","
+ + "\"message\":\"test-message-2\"}"
+ + "]";
+
+ assertThat(expected, is(equalTo(actual)));
+ }
+
+ /** Test whether payload is stringified as expected. */
+ @Test
+ public void stringPayloadTest_single() {
+ String actual =
DatadogEventSerializer.getPayloadString(DATADOG_TEST_EVENT_1);
+
+ String expected =
+ "{\"ddsource\":\"test-source-1\",\"ddtags\":\"test-tags-1\","
+ +
"\"hostname\":\"test-hostname-1\",\"service\":\"test-service-1\","
+ + "\"message\":\"test-message-1\"}";
+
+ assertThat(expected, is(equalTo(actual)));
+ }
+
+ /** Test payload size calculation for a payload string. */
+ @Test
+ public void stringPayloadSizeTest() {
+ long actual =
+ DatadogEventSerializer.getPayloadSize(
+ "{\"ddsource\":\"test-source-1\",\"ddtags\":\"test-tags-1\","
+ +
"\"hostname\":\"test-hostname-1\",\"service\":\"test-service-1\","
+ + "\"message\":\"test-message-1\"}");
+
+ long expected = 134L;
+
+ assertThat(expected, is(equalTo(actual)));
+ }
+}
diff --git
a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventTest.java
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventTest.java
new file mode 100644
index 00000000000..de1759faafb
--- /dev/null
+++
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.junit.Test;
+
+/** Unit tests for {@link DatadogEvent} class. */
+public class DatadogEventTest {
+
+ /** Test whether a {@link DatadogEvent} created via its builder can be
compared correctly. */
+ @Test
+ public void testEquals() {
+ String source = "test-source";
+ String tags = "test-tags";
+ String hostname = "test-hostname";
+ String service = "test-service";
+ String message = "test-message";
+
+ DatadogEvent actualEvent =
+ DatadogEvent.newBuilder()
+ .withSource(source)
+ .withTags(tags)
+ .withHostname(hostname)
+ .withService(service)
+ .withMessage(message)
+ .build();
+
+ assertThat(
+ actualEvent,
+ is(
+ equalTo(
+ DatadogEvent.newBuilder()
+ .withSource(source)
+ .withTags(tags)
+ .withHostname(hostname)
+ .withService(service)
+ .withMessage(message)
+ .build())));
+
+ assertThat(
+ actualEvent,
+ is(
+ not(
+ equalTo(
+ DatadogEvent.newBuilder()
+ .withSource(source)
+ .withTags(tags)
+ .withHostname(hostname)
+ .withService(service)
+ .withMessage("a-different-test-message")
+ .build()))));
+ }
+}
diff --git
a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventWriterTest.java
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventWriterTest.java
new file mode 100644
index 00000000000..086bb93f53e
--- /dev/null
+++
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventWriterTest.java
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockserver.configuration.ConfigurationProperties;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.matchers.Times;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+import org.mockserver.verify.VerificationTimes;
+
+/** Unit tests for {@link
com.google.cloud.teleport.datadog.DatadogEventWriter} class. */
+public class DatadogEventWriterTest {
+
+ private static final String EXPECTED_PATH = "/" +
DatadogEventPublisher.DD_URL_PATH;
+
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+ // We create a MockServerRule to simulate an actual Datadog API server.
+ private ClientAndServer mockServer;
+
+ @Before
+ public void setup() {
+ ConfigurationProperties.disableSystemOut(true);
+ mockServer = startClientAndServer();
+ }
+
+ @After
+ public void tearDown() {
+ if (mockServer != null) {
+ mockServer.stop();
+ }
+ }
+
+ /** Test building {@link DatadogEventWriter} with missing URL. */
+ @Test
+ public void eventWriterMissingURL() {
+
+ Exception thrown =
+ assertThrows(NullPointerException.class, () ->
DatadogEventWriter.newBuilder().build());
+
+ assertThat(thrown).hasMessageThat().contains("url needs to be provided");
+ }
+
+ /** Test building {@link DatadogEventWriter} with missing URL protocol. */
+ @Test
+ public void eventWriterMissingURLProtocol() {
+
+ Exception thrown =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> DatadogEventWriter.newBuilder().withUrl("test-url").build());
+
+
assertThat(thrown).hasMessageThat().contains(DatadogEventWriter.INVALID_URL_FORMAT_MESSAGE);
+ }
+
+ /** Test building {@link DatadogEventWriter} with an invalid URL. */
+ @Test
+ public void eventWriterInvalidURL() {
+
+ Exception thrown =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
DatadogEventWriter.newBuilder().withUrl("http://1.2.3").build());
+
+
assertThat(thrown).hasMessageThat().contains(DatadogEventWriter.INVALID_URL_FORMAT_MESSAGE);
+ }
+
+ /** Test building {@link DatadogEventWriter} with the 'api/v2/logs' path
appended to the URL. */
+ @Test
+ public void eventWriterFullEndpoint() {
+
+ Exception thrown =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ DatadogEventWriter.newBuilder()
+ .withUrl("http://test-url:8088/api/v2/logs")
+ .build());
+
+
assertThat(thrown).hasMessageThat().contains(DatadogEventWriter.INVALID_URL_FORMAT_MESSAGE);
+ }
+
+ /** Test building {@link DatadogEventWriter} with missing token. */
+ @Test
+ public void eventWriterMissingToken() {
+
+ Exception thrown =
+ assertThrows(
+ NullPointerException.class,
+ () ->
DatadogEventWriter.newBuilder().withUrl("http://test-url").build());
+
+ assertThat(thrown).hasMessageThat().contains("apiKey needs to be
provided");
+ }
+
+ /** Test building {@link DatadogEventWriter} with default batch count. */
+ @Test
+ public void eventWriterDefaultBatchCount() {
+
+ DatadogEventWriter writer =
+ DatadogEventWriter.newBuilder()
+ .withUrl("http://test-url")
+ .withApiKey("test-api-key")
+ .build();
+
+ assertThat(writer.inputBatchCount()).isNull();
+ }
+
+ /**
+ * Test building {@link DatadogEventWriter} with a batchCount less than the
configured minimum.
+ */
+ @Test
+ public void eventWriterBatchCountTooSmall() {
+
+ Exception thrown =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ DatadogEventWriter.newBuilder(7)
+ .withUrl("http://test-url")
+ .withApiKey("test-api-key")
+ .withInputBatchCount(6)
+ .build());
+
+ assertThat(thrown)
+ .hasMessageThat()
+ .contains("inputBatchCount must be greater than or equal to 7");
+ }
+
+ /** Test building {@link DatadogEventWriter} with a batchCount greater than
1000. */
+ @Test
+ public void eventWriterBatchCountTooBig() {
+
+ Exception thrown =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ DatadogEventWriter.newBuilder()
+ .withUrl("http://test-url")
+ .withApiKey("test-api-key")
+ .withInputBatchCount(1001)
+ .build());
+
+ assertThat(thrown)
+ .hasMessageThat()
+ .contains("inputBatchCount must be less than or equal to 1000");
+ }
+
+ /** Test building {@link DatadogEventWriter} with custom batchCount . */
+ @Test
+ public void eventWriterCustomBatchCountAndValidation() {
+
+ Integer batchCount = 30;
+ DatadogEventWriter writer =
+ DatadogEventWriter.newBuilder()
+ .withUrl("http://test-url")
+ .withApiKey("test-api-key")
+ .withInputBatchCount(batchCount)
+ .build();
+
+ assertThat(writer.inputBatchCount()).isEqualTo(batchCount);
+ }
+
+ /** Test building {@link DatadogEventWriter} with default maxBufferSize . */
+ @Test
+ public void eventWriterDefaultMaxBufferSize() {
+
+ DatadogEventWriter writer =
+ DatadogEventWriter.newBuilder()
+ .withUrl("http://test-url")
+ .withApiKey("test-api-key")
+ .build();
+
+ assertThat(writer.maxBufferSize()).isNull();
+ }
+
+ /** Test building {@link DatadogEventWriter} with custom maxBufferSize . */
+ @Test
+ public void eventWriterCustomMaxBufferSizeAndValidation() {
+
+ Long maxBufferSize = 1_427_841L;
+ DatadogEventWriter writer =
+ DatadogEventWriter.newBuilder()
+ .withUrl("http://test-url")
+ .withMaxBufferSize(maxBufferSize)
+ .withApiKey("test-api-key")
+ .build();
+
+ assertThat(writer.maxBufferSize()).isEqualTo(maxBufferSize);
+ }
+
+ /** Test successful POST request for single batch. */
+ @Test
+ @Category(NeedsRunner.class)
+ public void successfulDatadogWriteSingleBatchTest() {
+
+ // Create server expectation for success.
+ addRequestExpectation(202);
+
+ int testPort = mockServer.getPort();
+
+ List<KV<Integer, DatadogEvent>> testEvents =
+ ImmutableList.of(
+ KV.of(
+ 123,
+ DatadogEvent.newBuilder()
+ .withSource("test-source-1")
+ .withTags("test-tags-1")
+ .withHostname("test-hostname-1")
+ .withService("test-service-1")
+ .withMessage("test-message-1")
+ .build()),
+ KV.of(
+ 123,
+ DatadogEvent.newBuilder()
+ .withSource("test-source-2")
+ .withTags("test-tags-2")
+ .withHostname("test-hostname-2")
+ .withService("test-service-2")
+ .withMessage("test-message-2")
+ .build()));
+
+ PCollection<DatadogWriteError> actual =
+ pipeline
+ .apply(
+ "Create Input data",
+ Create.of(testEvents)
+ .withCoder(KvCoder.of(BigEndianIntegerCoder.of(),
DatadogEventCoder.of())))
+ .apply(
+ "DatadogEventWriter",
+ ParDo.of(
+ DatadogEventWriter.newBuilder(1)
+ .withUrl(Joiner.on(':').join("http://localhost",
testPort))
+ .withInputBatchCount(1) // Test one request per
DatadogEvent
+ .withApiKey("test-api-key")
+ .build()))
+ .setCoder(DatadogWriteErrorCoder.of());
+
+ // All successful responses.
+ PAssert.that(actual).empty();
+
+ pipeline.run();
+
+ // Server received exactly the expected number of POST requests.
+ mockServer.verify(
+ HttpRequest.request(EXPECTED_PATH),
VerificationTimes.exactly(testEvents.size()));
+ }
+
+ /** Test successful POST request for multi batch. */
+ @Test
+ @Category(NeedsRunner.class)
+ public void successfulDatadogWriteMultiBatchTest() {
+
+ // Create server expectation for success.
+ addRequestExpectation(202);
+
+ int testPort = mockServer.getPort();
+
+ List<KV<Integer, DatadogEvent>> testEvents =
+ ImmutableList.of(
+ KV.of(
+ 123,
+ DatadogEvent.newBuilder()
+ .withSource("test-source-1")
+ .withTags("test-tags-1")
+ .withHostname("test-hostname-1")
+ .withService("test-service-1")
+ .withMessage("test-message-1")
+ .build()),
+ KV.of(
+ 123,
+ DatadogEvent.newBuilder()
+ .withSource("test-source-2")
+ .withTags("test-tags-2")
+ .withHostname("test-hostname-2")
+ .withService("test-service-2")
+ .withMessage("test-message-2")
+ .build()));
+
+ PCollection<DatadogWriteError> actual =
+ pipeline
+ .apply(
+ "Create Input data",
+ Create.of(testEvents)
+ .withCoder(KvCoder.of(BigEndianIntegerCoder.of(),
DatadogEventCoder.of())))
+ .apply(
+ "DatadogEventWriter",
+ ParDo.of(
+ DatadogEventWriter.newBuilder(1)
+ .withUrl(Joiner.on(':').join("http://localhost",
testPort))
+ .withInputBatchCount(testEvents.size()) // all
requests in a single batch.
+ .withApiKey("test-api-key")
+ .build()))
+ .setCoder(DatadogWriteErrorCoder.of());
+
+ // All successful responses.
+ PAssert.that(actual).empty();
+
+ pipeline.run();
+
+ // Server received exactly one POST request.
+ mockServer.verify(HttpRequest.request(EXPECTED_PATH),
VerificationTimes.once());
+ }
+
+ /** Test successful POST requests for batch exceeding max buffer size. */
+ @Test
+ @Category(NeedsRunner.class)
+ public void successfulDatadogWriteExceedingMaxBufferSize() {
+
+ // Create server expectation for success.
+ addRequestExpectation(202);
+
+ int testPort = mockServer.getPort();
+
+ String payloadFormat = "{\"message\":\"%s\"}";
+ long jsonSize =
DatadogEventSerializer.getPayloadSize(String.format(payloadFormat, ""));
+
+ long maxBufferSize = 100;
+ long msgSize = 50;
+
+ char[] bunchOfAs = new char[(int) (msgSize - jsonSize)];
+ Arrays.fill(bunchOfAs, 'a');
+
+ List<KV<Integer, DatadogEvent>> testEvents = new ArrayList<>();
+ for (int i = 1; i <= 3; i++) {
+ testEvents.add(
+ KV.of(123, DatadogEvent.newBuilder().withMessage(new
String(bunchOfAs)).build()));
+ }
+
+ PCollection<DatadogWriteError> actual =
+ pipeline
+ .apply(
+ "Create Input data",
+ Create.of(testEvents)
+ .withCoder(KvCoder.of(BigEndianIntegerCoder.of(),
DatadogEventCoder.of())))
+ .apply(
+ "DatadogEventWriter",
+ ParDo.of(
+ DatadogEventWriter.newBuilder(1)
+ .withUrl(Joiner.on(':').join("http://localhost",
testPort))
+ .withInputBatchCount(testEvents.size())
+ .withMaxBufferSize(maxBufferSize)
+ .withApiKey("test-api-key")
+ .build()))
+ .setCoder(DatadogWriteErrorCoder.of());
+
+ // All successful responses.
+ PAssert.that(actual).empty();
+
+ pipeline.run();
+
+ // Server received exactly two POST requests:
+ // 1st batch of size=2 due to next msg exceeding max buffer size
+ // 2nd batch of size=1 due to timer
+ mockServer.verify(HttpRequest.request(EXPECTED_PATH),
VerificationTimes.exactly(2));
+ }
+
+ /** Test failed POST request. */
+ @Test
+ @Category(NeedsRunner.class)
+ public void failedDatadogWriteSingleBatchTest() {
+
+ // Create server expectation for FAILURE.
+ addRequestExpectation(404);
+
+ int testPort = mockServer.getPort();
+
+ List<KV<Integer, DatadogEvent>> testEvents =
+ ImmutableList.of(
+ KV.of(
+ 123,
+ DatadogEvent.newBuilder()
+ .withSource("test-source-1")
+ .withTags("test-tags-1")
+ .withHostname("test-hostname-1")
+ .withService("test-service-1")
+ .withMessage("test-message-1")
+ .build()));
+
+ PCollection<DatadogWriteError> actual =
+ pipeline
+ .apply(
+ "Create Input data",
+ Create.of(testEvents)
+ .withCoder(KvCoder.of(BigEndianIntegerCoder.of(),
DatadogEventCoder.of())))
+ .apply(
+ "DatadogEventWriter",
+ ParDo.of(
+ DatadogEventWriter.newBuilder(1)
+ .withUrl(Joiner.on(':').join("http://localhost",
testPort))
+ .withInputBatchCount(testEvents.size()) // all
requests in a single batch.
+ .withApiKey("test-api-key")
+ .build()))
+ .setCoder(DatadogWriteErrorCoder.of());
+
+ // Expect a single 404 Not found DatadogWriteError
+ PAssert.that(actual)
+ .containsInAnyOrder(
+ DatadogWriteError.newBuilder()
+ .withStatusCode(404)
+ .withStatusMessage("Not Found")
+ .withPayload(
+ "{\"ddsource\":\"test-source-1\","
+ +
"\"ddtags\":\"test-tags-1\",\"hostname\":\"test-hostname-1\","
+ +
"\"service\":\"test-service-1\",\"message\":\"test-message-1\"}")
+ .build());
+
+ pipeline.run();
+
+ // Server received exactly one POST request.
+ mockServer.verify(HttpRequest.request(EXPECTED_PATH),
VerificationTimes.once());
+ }
+
+ /** Test failed due to single event exceeding max buffer size. */
+ @Test
+ @Category(NeedsRunner.class)
+ public void failedDatadogEventTooBig() {
+
+ // Create server expectation for FAILURE.
+ addRequestExpectation(404);
+
+ int testPort = mockServer.getPort();
+
+ String payloadFormat = "{\"message\":\"%s\"}";
+
+ long maxBufferSize = 100;
+ char[] bunchOfAs =
+ new char
+ [(int)
+ (maxBufferSize
+ + 1L
+ -
DatadogEventSerializer.getPayloadSize(String.format(payloadFormat, "")))];
+ Arrays.fill(bunchOfAs, 'a');
+ String messageTooBig = new String(bunchOfAs);
+
+ String expectedPayload = String.format(payloadFormat, messageTooBig);
+ long expectedPayloadSize =
DatadogEventSerializer.getPayloadSize(expectedPayload);
+ assertThat(maxBufferSize + 1L).isEqualTo(expectedPayloadSize);
+
+ List<KV<Integer, DatadogEvent>> testEvents =
+ ImmutableList.of(KV.of(123,
DatadogEvent.newBuilder().withMessage(messageTooBig).build()));
+
+ PCollection<DatadogWriteError> actual =
+ pipeline
+ .apply(
+ "Create Input data",
+ Create.of(testEvents)
+ .withCoder(KvCoder.of(BigEndianIntegerCoder.of(),
DatadogEventCoder.of())))
+ .apply(
+ "DatadogEventWriter",
+ ParDo.of(
+ DatadogEventWriter.newBuilder()
+ .withUrl(Joiner.on(':').join("http://localhost",
testPort))
+ .withMaxBufferSize(maxBufferSize)
+ .withApiKey("test-api-key")
+ .build()))
+ .setCoder(DatadogWriteErrorCoder.of());
+
+ // Expect a single DatadogWriteError due to exceeding max buffer size
+ PAssert.that(actual)
+
.containsInAnyOrder(DatadogWriteError.newBuilder().withPayload(expectedPayload).build());
+
+ pipeline.run();
+
+ // Server did not receive any requests.
+ mockServer.verify(HttpRequest.request(EXPECTED_PATH),
VerificationTimes.exactly(0));
+ }
+
+ /** Test retryable POST request. */
+ @Test
+ @Category(NeedsRunner.class)
+ public void retryableDatadogWriteSingleBatchTest() {
+
+ // Create server expectations for 3 retryable failures, 1 success.
+ addRequestExpectation(408, Times.once());
+ addRequestExpectation(429, Times.once());
+ addRequestExpectation(502, Times.once());
+ addRequestExpectation(202, Times.once());
+
+ int testPort = mockServer.getPort();
+
+ List<KV<Integer, DatadogEvent>> testEvents =
+ ImmutableList.of(
+ KV.of(
+ 123,
+ DatadogEvent.newBuilder()
+ .withSource("test-source-1")
+ .withTags("test-tags-1")
+ .withHostname("test-hostname-1")
+ .withService("test-service-1")
+ .withMessage("test-message-1")
+ .build()));
+
+ PCollection<DatadogWriteError> actual =
+ pipeline
+ .apply(
+ "Create Input data",
+ Create.of(testEvents)
+ .withCoder(KvCoder.of(BigEndianIntegerCoder.of(),
DatadogEventCoder.of())))
+ .apply(
+ "DatadogEventWriter",
+ ParDo.of(
+ DatadogEventWriter.newBuilder(1)
+ .withUrl(Joiner.on(':').join("http://localhost",
testPort))
+ .withInputBatchCount(testEvents.size()) // all
requests in a single batch.
+ .withApiKey("test-api-key")
+ .build()))
+ .setCoder(DatadogWriteErrorCoder.of());
+
+ PAssert.that(actual).empty();
+
+ // All successful responses, eventually.
+ pipeline.run();
+
+ // Server received exactly 4 POST requests (3 retryable failures, 1
success).
+ mockServer.verify(HttpRequest.request(EXPECTED_PATH),
VerificationTimes.exactly(4));
+ }
+
+ private void addRequestExpectation(int statusCode) {
+ addRequestExpectation(statusCode, Times.unlimited());
+ }
+
+ private void addRequestExpectation(int statusCode, Times times) {
+ mockServer
+ .when(HttpRequest.request(EXPECTED_PATH), times)
+ .respond(HttpResponse.response().withStatusCode(statusCode));
+ }
+}
diff --git
a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogIOTest.java
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogIOTest.java
new file mode 100644
index 00000000000..8680333b4dd
--- /dev/null
+++
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogIOTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockserver.configuration.ConfigurationProperties;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+import org.mockserver.verify.VerificationTimes;
+
+/** Unit tests for {@link com.google.cloud.teleport.datadog.DatadogIO} class.
*/
+public class DatadogIOTest {
+
+ private static final DatadogEvent DATADOG_TEST_EVENT_1 =
+ DatadogEvent.newBuilder()
+ .withSource("test-source-1")
+ .withTags("test-tags-1")
+ .withHostname("test-hostname-1")
+ .withService("test-service-1")
+ .withMessage("test-message-1")
+ .build();
+
+ private static final DatadogEvent DATADOG_TEST_EVENT_2 =
+ DatadogEvent.newBuilder()
+ .withSource("test-source-2")
+ .withTags("test-tags-2")
+ .withHostname("test-hostname-2")
+ .withService("test-service-2")
+ .withMessage("test-message-2")
+ .build();
+
+ private static final List<DatadogEvent> DATADOG_EVENTS =
+ ImmutableList.of(DATADOG_TEST_EVENT_1, DATADOG_TEST_EVENT_2);
+
+ private static final String EXPECTED_PATH = "/" +
DatadogEventPublisher.DD_URL_PATH;
+ private static final int TEST_PARALLELISM = 2;
+
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+ // We create a mock server to simulate an actual Datadog API server.
+ private ClientAndServer mockServer;
+
+ @Before
+ public void setup() throws IOException {
+ ConfigurationProperties.disableSystemOut(true);
+ mockServer = startClientAndServer();
+ }
+
+ /** Test successful multi-event POST request for DatadogIO without
parallelism. */
+ @Test
+ @Category(NeedsRunner.class)
+ public void successfulDatadogIOMultiBatchNoParallelismTest() {
+
+ // Create server expectation for success.
+ mockServerListening(200);
+ PCollection<DatadogWriteError> actual =
+ pipeline
+ .apply("Create Input data",
Create.of(DATADOG_EVENTS).withCoder(DatadogEventCoder.of()))
+ .apply(
+ "DatadogIO",
+ DatadogIO.writeBuilder(1)
+ .withParallelism(1)
+ .withBatchCount(DATADOG_EVENTS.size())
+ .withApiKey("test-api-key")
+ .withUrl(Joiner.on(':').join("http://localhost",
mockServer.getPort()))
+ .build())
+ .setCoder(DatadogWriteErrorCoder.of());
+
+ // All successful responses.
+ PAssert.that(actual).empty();
+
+ pipeline.run();
+
+ // Server received exactly one POST request.
+ mockServer.verify(HttpRequest.request(EXPECTED_PATH),
VerificationTimes.once());
+ }
+
+ /** Test successful multi-event POST request for DatadogIO with parallelism.
*/
+ @Test
+ @Category(NeedsRunner.class)
+ public void successfulDatadogIOMultiBatchParallelismTest() {
+
+ // Create server expectation for success.
+ mockServerListening(200);
+ PCollection<DatadogWriteError> actual =
+ pipeline
+ .apply("Create Input data",
Create.of(DATADOG_EVENTS).withCoder(DatadogEventCoder.of()))
+ .apply(
+ "DatadogIO",
+ DatadogIO.writeBuilder(1)
+ .withParallelism(TEST_PARALLELISM)
+ .withBatchCount(DATADOG_EVENTS.size())
+ .withApiKey("test-api-key")
+ .withUrl(Joiner.on(':').join("http://localhost",
mockServer.getPort()))
+ .build())
+ .setCoder(DatadogWriteErrorCoder.of());
+
+ // All successful responses.
+ PAssert.that(actual).empty();
+
+ pipeline.run();
+
+ // Server received exactly one POST request per parallelism
+ mockServer.verify(HttpRequest.request(EXPECTED_PATH),
VerificationTimes.atLeast(1));
+ }
+
+ /** Test successful multi-event POST request for DatadogIO with parallelism.
*/
+ @Test
+ @Category(NeedsRunner.class)
+ public void successfulDatadogIOSingleBatchParallelismTest() {
+
+ // Create server expectation for success.
+ mockServerListening(200);
+ PCollection<DatadogWriteError> actual =
+ pipeline
+ .apply("Create Input data",
Create.of(DATADOG_EVENTS).withCoder(DatadogEventCoder.of()))
+ .apply(
+ "DatadogIO",
+ DatadogIO.writeBuilder(1)
+ .withParallelism(TEST_PARALLELISM)
+ .withBatchCount(1)
+ .withApiKey("test-api-key")
+ .withUrl(Joiner.on(':').join("http://localhost",
mockServer.getPort()))
+ .build())
+ .setCoder(DatadogWriteErrorCoder.of());
+
+ // All successful responses.
+ PAssert.that(actual).empty();
+
+ pipeline.run();
+
+ // Server received exactly 1 post request per DatadogEvent
+ mockServer.verify(
+ HttpRequest.request(EXPECTED_PATH),
VerificationTimes.exactly(DATADOG_EVENTS.size()));
+ }
+
+ private void mockServerListening(int statusCode) {
+ mockServer
+ .when(HttpRequest.request(EXPECTED_PATH))
+ .respond(HttpResponse.response().withStatusCode(statusCode));
+ }
+}
diff --git
a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoderTest.java
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoderTest.java
new file mode 100644
index 00000000000..e5932d2b612
--- /dev/null
+++
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoderTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.junit.Test;
+
+/** Unit tests for {@link
com.google.cloud.teleport.datadog.DatadogWriteErrorCoder} class. */
+public class DatadogWriteErrorCoderTest {
+
+ /**
+ * Test whether {@link DatadogWriteErrorCoder} is able to encode/decode a
{@link
+ * DatadogWriteError} correctly.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testEncodeDecode() throws IOException {
+
+ String payload = "test-payload";
+ String message = "test-message";
+ Integer statusCode = 123;
+
+ DatadogWriteError actualError =
+ DatadogWriteError.newBuilder()
+ .withPayload(payload)
+ .withStatusCode(statusCode)
+ .withStatusMessage(message)
+ .build();
+
+ DatadogWriteErrorCoder coder = DatadogWriteErrorCoder.of();
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+ coder.encode(actualError, bos);
+ try (ByteArrayInputStream bin = new
ByteArrayInputStream(bos.toByteArray())) {
+ DatadogWriteError decodedWriteError = coder.decode(bin);
+ assertThat(decodedWriteError, is(equalTo(actualError)));
+ }
+ }
+ }
+}
diff --git
a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorTest.java
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorTest.java
new file mode 100644
index 00000000000..0aadc1f7018
--- /dev/null
+++
b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.junit.Test;
+
+/** Unit tests for {@link DatadogWriteError} class. */
+public class DatadogWriteErrorTest {
+
+ /** Test whether a {@link DatadogWriteError} created via its builder can be
compared correctly. */
+ @Test
+ public void testEquals() {
+
+ String payload = "test-payload";
+ String message = "test-message";
+ Integer statusCode = 123;
+
+ DatadogWriteError actualError =
+ DatadogWriteError.newBuilder()
+ .withPayload(payload)
+ .withStatusCode(statusCode)
+ .withStatusMessage(message)
+ .build();
+
+ assertThat(
+ actualError,
+ is(
+ equalTo(
+ DatadogWriteError.newBuilder()
+ .withPayload(payload)
+ .withStatusCode(statusCode)
+ .withStatusMessage(message)
+ .build())));
+
+ assertThat(
+ actualError,
+ is(
+ not(
+ equalTo(
+ DatadogWriteError.newBuilder()
+ .withPayload(payload)
+ .withStatusCode(statusCode)
+ .withStatusMessage("a-different-message")
+ .build()))));
+ }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index d99000383ea..4540fa4b597 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -225,6 +225,7 @@ include(":sdks:java:io:file-based-io-tests")
include(":sdks:java:io:bigquery-io-perf-tests")
include(":sdks:java:io:cdap")
include(":sdks:java:io:csv")
+include(":sdks:java:io:datadog")
include(":sdks:java:io:file-schema-transform")
include(":sdks:java:io:google-ads")
include(":sdks:java:io:google-cloud-platform")