gemini-code-assist[bot] commented on code in PR #38362:
URL: https://github.com/apache/beam/pull/38362#discussion_r3184197410
##########
sdks/python/apache_beam/yaml/integration_tests.py:
##########
@@ -534,6 +561,171 @@ def
temp_pubsub_emulator(project_id="apache-beam-testing"):
yield created_topic_object.name
+class DatadogContainer(DockerContainer):
+ """
+ DatadogContainer starts a Datadog agent container for integration tests.
+ It exposes ports for DogStatsD (metrics) and the trace agent (APM).
+ """
+ def __init__(self, image="datadog/agent:latest"):
+ super().__init__(image)
+ self.statsd_port = 8125
+ self.trace_port = 8126
+ # An API key is required, but for local testing against the agent,
+ # it doesn't have to be a valid one.
+ self.with_env("DD_API_KEY", "dummy_key_for_testing")
+ # Redirect agent's own telemetry to prevent 403 errors from the real site
+ self.with_env("DD_DD_URL", "http://localhost:1234")
+ # Disable log collection for test purposes to reduce noise
+ self.with_env("DD_LOGS_ENABLED", "true")
+ self.with_exposed_ports(self.statsd_port, self.trace_port)
+
+ def start(self):
+ super().start()
+ # Wait for the agent to be ready to receive traces and metrics.
+ # "Agent started" indicates the core agent is up.
+ # Disabling this wait as the container is reported as healthy and
+ # this specific log might not appear with dummy URLs.
+ # wait_for_logs(self, "Agent started", timeout=120)
+ return self
+
+ def get_statsd_host(self):
+ return self.get_container_host_ip()
+
+ def get_statsd_port(self):
+ return self.get_exposed_port(self.statsd_port)
+
+ def get_trace_agent_host(self):
+ return self.get_container_host_ip()
+
+ def get_trace_agent_port(self):
+ return self.get_exposed_port(self.trace_port)
+
+ def get_api_key(self):
+ return "dummy_key_for_testing"
+
+ def get_logs_url(self):
+ # The trace agent and logs agent listen on the same port by default
+ return \
+ f"http://{self.get_container_host_ip()}:{self.get_trace_agent_port()}"
Review Comment:

The line continuation backslash is unnecessary here as the expression is
part of a return statement. Additionally, the indentation of the following line
is inconsistent. The code can be simplified into a single line for better
readability.
```python
return
f"http://{self.get_container_host_ip()}:{self.get_trace_agent_port()}"
```
##########
sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteSchemaTransformProvider.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+@AutoService(SchemaTransformProvider.class)
+public class DatadogWriteSchemaTransformProvider
+ extends
TypedSchemaTransformProvider<DatadogWriteSchemaTransformConfiguration> {
+ private static final String IDENTIFIER =
"beam:schematransform:org.apache.beam:datadog_write:v1";
+ static final String INPUT = "input";
+ static final String OUTPUT = "output";
+ static final String ERROR = "errors";
+ public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
+ public static final TupleTag<Void> OUTPUT_TAG = new TupleTag<Void>() {};
+ public static final TupleTag<DatadogEvent> EVENT_TAG = new
TupleTag<DatadogEvent>() {};
+
+ @Override
+ protected Class<DatadogWriteSchemaTransformConfiguration>
configurationClass() {
+ return DatadogWriteSchemaTransformConfiguration.class;
+ }
+
+ /** Returns the expected {@link SchemaTransform} of the configuration. */
+ @Override
+ protected SchemaTransform from(DatadogWriteSchemaTransformConfiguration
configuration) {
+ return new DatadogWriteSchemaTransform(configuration);
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} input
collection names method. */
+ @Override
+ public List<String> inputCollectionNames() {
+ return Collections.singletonList(INPUT);
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} output
collection names method. */
+ @Override
+ public List<String> outputCollectionNames() {
+ return Collections.singletonList(ERROR);
+ }
+
+ /**
+ * An implementation of {@link SchemaTransform} for Datadog Write jobs
configured using {@link
+ * DatadogWriteSchemaTransformConfiguration}.
+ */
+ static class DatadogWriteSchemaTransform extends SchemaTransform {
+ private final DatadogWriteSchemaTransformConfiguration configuration;
+
+ DatadogWriteSchemaTransform(DatadogWriteSchemaTransformConfiguration
configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ // Validate configuration parameters
+ configuration.validate();
+
+ // Obtain input rows
+ PCollection<Row> inputRows = input.get(INPUT);
+
+ // Check for errors
+ boolean handleErrors =
ErrorHandling.hasOutput(configuration.getErrorHandling());
+
+ Schema inputSchema = inputRows.getSchema();
+ Schema dynamicErrorSchema =
+ Schema.builder()
+ .addNullableRowField("failed_row", inputSchema)
+ .addNullableField("payload", Schema.FieldType.STRING)
+ .addNullableField("statusCode", Schema.FieldType.INT32)
+ .addNullableField("statusMessage", Schema.FieldType.STRING)
+ .build();
+
+ PCollectionTuple convertResult =
+ inputRows.apply(
+ "Convert to DatadogEvent",
+ ParDo.of(new RowToEventFn(handleErrors, ERROR_TAG,
dynamicErrorSchema))
+ .withOutputTags(EVENT_TAG, TupleTagList.of(ERROR_TAG)));
+
+ PCollection<DatadogEvent> datadogEvents =
+ convertResult.get(EVENT_TAG).setCoder(DatadogEventCoder.of());
+ PCollection<Row> conversionErrors =
+ convertResult
+ .get(ERROR_TAG)
+
.setCoder(org.apache.beam.sdk.coders.RowCoder.of(dynamicErrorSchema));
+
+ // Configure DatadogIO.Write
+ DatadogIO.Write.Builder builder =
+ DatadogIO.writeBuilder(configuration.getMinBatchCount())
+ .withUrl(configuration.getUrl())
+ .withApiKey(configuration.getApiKey());
+
+ Integer batchCount = configuration.getBatchCount();
+ if (batchCount != null) {
+ builder = builder.withBatchCount(batchCount);
+ }
+ Long maxBufferSize = configuration.getMaxBufferSize();
+ if (maxBufferSize != null) {
+ builder = builder.withMaxBufferSize(maxBufferSize);
+ }
+ Integer parallelism = configuration.getParallelism();
+ if (parallelism != null) {
+ builder = builder.withParallelism(parallelism);
+ }
+
+ DatadogIO.Write write = builder.build();
+
+ // Apply DatadogIO.Write
+ PCollection<DatadogWriteError> writeErrors = datadogEvents.apply("Write
To Datadog", write);
+
+ // Handle errors
+ ErrorHandling errorHandling = configuration.getErrorHandling();
+ if (errorHandling != null) {
+ PCollection<Row> writeErrorRows =
+ writeErrors
+ .apply(
+ "Convert Write Errors to Rows",
+ org.apache.beam.sdk.transforms.MapElements.into(
+ org.apache.beam.sdk.values.TypeDescriptors.rows())
+ .via(
+ error ->
+ Row.withSchema(dynamicErrorSchema)
+ .addValue(null)
+ .addValue(error.payload())
+ .addValue(error.statusCode())
+ .addValue(error.statusMessage())
+ .build()))
+
.setCoder(org.apache.beam.sdk.coders.RowCoder.of(dynamicErrorSchema));
+
+ PCollection<Row> allErrors =
+ org.apache.beam.sdk.values.PCollectionList.of(conversionErrors)
+ .and(writeErrorRows)
+ .apply("Flatten Errors",
org.apache.beam.sdk.transforms.Flatten.pCollections())
+
.setCoder(org.apache.beam.sdk.coders.RowCoder.of(dynamicErrorSchema));
+
+ return PCollectionRowTuple.of(errorHandling.getOutput(), allErrors);
+ } else {
+ writeErrors.apply("Fail on Write Error", ParDo.of(new
FailOnWriteErrorFn()));
+ return PCollectionRowTuple.empty(input.getPipeline());
Review Comment:

Returning an empty `PCollectionRowTuple` when `errorHandling` is null is
inconsistent with `outputCollectionNames()`, which advertises an `"errors"`
output. This can cause runtime errors (e.g., "key not found") when downstream
components or runners attempt to access the advertised output. Consider
returning a tuple that includes an empty `PCollection` for the `ERROR` tag to
maintain consistency with the provider's declared outputs.
##########
sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteSchemaTransformProvider.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+@AutoService(SchemaTransformProvider.class)
+public class DatadogWriteSchemaTransformProvider
+ extends
TypedSchemaTransformProvider<DatadogWriteSchemaTransformConfiguration> {
+ private static final String IDENTIFIER =
"beam:schematransform:org.apache.beam:datadog_write:v1";
+ static final String INPUT = "input";
+ static final String OUTPUT = "output";
+ static final String ERROR = "errors";
+ public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
+ public static final TupleTag<Void> OUTPUT_TAG = new TupleTag<Void>() {};
+ public static final TupleTag<DatadogEvent> EVENT_TAG = new
TupleTag<DatadogEvent>() {};
+
+ @Override
+ protected Class<DatadogWriteSchemaTransformConfiguration>
configurationClass() {
+ return DatadogWriteSchemaTransformConfiguration.class;
+ }
+
+ /** Returns the expected {@link SchemaTransform} of the configuration. */
+ @Override
+ protected SchemaTransform from(DatadogWriteSchemaTransformConfiguration
configuration) {
+ return new DatadogWriteSchemaTransform(configuration);
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} input
collection names method. */
+ @Override
+ public List<String> inputCollectionNames() {
+ return Collections.singletonList(INPUT);
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} output
collection names method. */
+ @Override
+ public List<String> outputCollectionNames() {
+ return Collections.singletonList(ERROR);
+ }
+
+ /**
+ * An implementation of {@link SchemaTransform} for Datadog Write jobs
configured using {@link
+ * DatadogWriteSchemaTransformConfiguration}.
+ */
+ static class DatadogWriteSchemaTransform extends SchemaTransform {
+ private final DatadogWriteSchemaTransformConfiguration configuration;
+
+ DatadogWriteSchemaTransform(DatadogWriteSchemaTransformConfiguration
configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ // Validate configuration parameters
+ configuration.validate();
+
+ // Obtain input rows
+ PCollection<Row> inputRows = input.get(INPUT);
+
+ // Check for errors
+ boolean handleErrors =
ErrorHandling.hasOutput(configuration.getErrorHandling());
+
+ Schema inputSchema = inputRows.getSchema();
+ Schema dynamicErrorSchema =
+ Schema.builder()
+ .addNullableRowField("failed_row", inputSchema)
+ .addNullableField("payload", Schema.FieldType.STRING)
+ .addNullableField("statusCode", Schema.FieldType.INT32)
+ .addNullableField("statusMessage", Schema.FieldType.STRING)
+ .build();
+
+ PCollectionTuple convertResult =
+ inputRows.apply(
+ "Convert to DatadogEvent",
+ ParDo.of(new RowToEventFn(handleErrors, ERROR_TAG,
dynamicErrorSchema))
+ .withOutputTags(EVENT_TAG, TupleTagList.of(ERROR_TAG)));
+
+ PCollection<DatadogEvent> datadogEvents =
+ convertResult.get(EVENT_TAG).setCoder(DatadogEventCoder.of());
+ PCollection<Row> conversionErrors =
+ convertResult
+ .get(ERROR_TAG)
+
.setCoder(org.apache.beam.sdk.coders.RowCoder.of(dynamicErrorSchema));
+
+ // Configure DatadogIO.Write
+ DatadogIO.Write.Builder builder =
+ DatadogIO.writeBuilder(configuration.getMinBatchCount())
+ .withUrl(configuration.getUrl())
+ .withApiKey(configuration.getApiKey());
+
+ Integer batchCount = configuration.getBatchCount();
+ if (batchCount != null) {
+ builder = builder.withBatchCount(batchCount);
+ }
+ Long maxBufferSize = configuration.getMaxBufferSize();
+ if (maxBufferSize != null) {
+ builder = builder.withMaxBufferSize(maxBufferSize);
+ }
+ Integer parallelism = configuration.getParallelism();
+ if (parallelism != null) {
+ builder = builder.withParallelism(parallelism);
+ }
+
+ DatadogIO.Write write = builder.build();
+
+ // Apply DatadogIO.Write
+ PCollection<DatadogWriteError> writeErrors = datadogEvents.apply("Write
To Datadog", write);
+
+ // Handle errors
+ ErrorHandling errorHandling = configuration.getErrorHandling();
+ if (errorHandling != null) {
+ PCollection<Row> writeErrorRows =
+ writeErrors
+ .apply(
+ "Convert Write Errors to Rows",
+ org.apache.beam.sdk.transforms.MapElements.into(
+ org.apache.beam.sdk.values.TypeDescriptors.rows())
+ .via(
+ error ->
+ Row.withSchema(dynamicErrorSchema)
+ .addValue(null)
+ .addValue(error.payload())
+ .addValue(error.statusCode())
+ .addValue(error.statusMessage())
+ .build()))
+
.setCoder(org.apache.beam.sdk.coders.RowCoder.of(dynamicErrorSchema));
+
+ PCollection<Row> allErrors =
+ org.apache.beam.sdk.values.PCollectionList.of(conversionErrors)
+ .and(writeErrorRows)
+ .apply("Flatten Errors",
org.apache.beam.sdk.transforms.Flatten.pCollections())
+
.setCoder(org.apache.beam.sdk.coders.RowCoder.of(dynamicErrorSchema));
+
+ return PCollectionRowTuple.of(errorHandling.getOutput(), allErrors);
Review Comment:

The output tag used here (`errorHandling.getOutput()`) should match the name
declared in `outputCollectionNames()`, which is currently fixed to `"errors"`
(via the `ERROR` constant). Using a dynamic name from the configuration that
differs from the advertised name can break discovery and cross-language
compatibility, as the expansion service and runners expect the output keys to
match the provider's contract. It is recommended to use the `ERROR` constant as
the tag to ensure consistency.
```suggestion
return PCollectionRowTuple.of(ERROR, allErrors);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]