gemini-code-assist[bot] commented on code in PR #38362:
URL: https://github.com/apache/beam/pull/38362#discussion_r3184197410


##########
sdks/python/apache_beam/yaml/integration_tests.py:
##########
@@ -534,6 +561,171 @@ def 
temp_pubsub_emulator(project_id="apache-beam-testing"):
     yield created_topic_object.name
 
 
+class DatadogContainer(DockerContainer):
+  """
+  DatadogContainer starts a Datadog agent container for integration tests.
+  It exposes ports for DogStatsD (metrics) and the trace agent (APM).
+  """
+  def __init__(self, image="datadog/agent:latest"):
+    super().__init__(image)
+    self.statsd_port = 8125
+    self.trace_port = 8126
+    # An API key is required, but for local testing against the agent,
+    # it doesn't have to be a valid one.
+    self.with_env("DD_API_KEY", "dummy_key_for_testing")
+    # Redirect agent's own telemetry to prevent 403 errors from the real site
+    self.with_env("DD_DD_URL", "http://localhost:1234";)
+    # Disable log collection for test purposes to reduce noise
+    self.with_env("DD_LOGS_ENABLED", "true")
+    self.with_exposed_ports(self.statsd_port, self.trace_port)
+
+  def start(self):
+    super().start()
+    # Wait for the agent to be ready to receive traces and metrics.
+    # "Agent started" indicates the core agent is up.
+    # Disabling this wait as the container is reported as healthy and
+    # this specific log might not appear with dummy URLs.
+    # wait_for_logs(self, "Agent started", timeout=120)
+    return self
+
+  def get_statsd_host(self):
+    return self.get_container_host_ip()
+
+  def get_statsd_port(self):
+    return self.get_exposed_port(self.statsd_port)
+
+  def get_trace_agent_host(self):
+    return self.get_container_host_ip()
+
+  def get_trace_agent_port(self):
+    return self.get_exposed_port(self.trace_port)
+
+  def get_api_key(self):
+    return "dummy_key_for_testing"
+
+  def get_logs_url(self):
+    # The trace agent and logs agent listen on the same port by default
+    return \
+      f"http://{self.get_container_host_ip()}:{self.get_trace_agent_port()}"

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The line continuation backslash is unnecessary here as the expression is 
part of a return statement. Additionally, the indentation of the following line 
is inconsistent. The code can be simplified into a single line for better 
readability.
   
   ```python
       return 
f"http://{self.get_container_host_ip()}:{self.get_trace_agent_port()}"
   ```



##########
sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteSchemaTransformProvider.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+@AutoService(SchemaTransformProvider.class)
+public class DatadogWriteSchemaTransformProvider
+    extends 
TypedSchemaTransformProvider<DatadogWriteSchemaTransformConfiguration> {
+  private static final String IDENTIFIER = 
"beam:schematransform:org.apache.beam:datadog_write:v1";
+  static final String INPUT = "input";
+  static final String OUTPUT = "output";
+  static final String ERROR = "errors";
+  public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
+  public static final TupleTag<Void> OUTPUT_TAG = new TupleTag<Void>() {};
+  public static final TupleTag<DatadogEvent> EVENT_TAG = new 
TupleTag<DatadogEvent>() {};
+
+  @Override
+  protected Class<DatadogWriteSchemaTransformConfiguration> 
configurationClass() {
+    return DatadogWriteSchemaTransformConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(DatadogWriteSchemaTransformConfiguration 
configuration) {
+    return new DatadogWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} input 
collection names method. */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} output 
collection names method. */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(ERROR);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for Datadog Write jobs 
configured using {@link
+   * DatadogWriteSchemaTransformConfiguration}.
+   */
+  static class DatadogWriteSchemaTransform extends SchemaTransform {
+    private final DatadogWriteSchemaTransformConfiguration configuration;
+
+    DatadogWriteSchemaTransform(DatadogWriteSchemaTransformConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      // Validate configuration parameters
+      configuration.validate();
+
+      // Obtain input rows
+      PCollection<Row> inputRows = input.get(INPUT);
+
+      // Check for errors
+      boolean handleErrors = 
ErrorHandling.hasOutput(configuration.getErrorHandling());
+
+      Schema inputSchema = inputRows.getSchema();
+      Schema dynamicErrorSchema =
+          Schema.builder()
+              .addNullableRowField("failed_row", inputSchema)
+              .addNullableField("payload", Schema.FieldType.STRING)
+              .addNullableField("statusCode", Schema.FieldType.INT32)
+              .addNullableField("statusMessage", Schema.FieldType.STRING)
+              .build();
+
+      PCollectionTuple convertResult =
+          inputRows.apply(
+              "Convert to DatadogEvent",
+              ParDo.of(new RowToEventFn(handleErrors, ERROR_TAG, 
dynamicErrorSchema))
+                  .withOutputTags(EVENT_TAG, TupleTagList.of(ERROR_TAG)));
+
+      PCollection<DatadogEvent> datadogEvents =
+          convertResult.get(EVENT_TAG).setCoder(DatadogEventCoder.of());
+      PCollection<Row> conversionErrors =
+          convertResult
+              .get(ERROR_TAG)
+              
.setCoder(org.apache.beam.sdk.coders.RowCoder.of(dynamicErrorSchema));
+
+      // Configure DatadogIO.Write
+      DatadogIO.Write.Builder builder =
+          DatadogIO.writeBuilder(configuration.getMinBatchCount())
+              .withUrl(configuration.getUrl())
+              .withApiKey(configuration.getApiKey());
+
+      Integer batchCount = configuration.getBatchCount();
+      if (batchCount != null) {
+        builder = builder.withBatchCount(batchCount);
+      }
+      Long maxBufferSize = configuration.getMaxBufferSize();
+      if (maxBufferSize != null) {
+        builder = builder.withMaxBufferSize(maxBufferSize);
+      }
+      Integer parallelism = configuration.getParallelism();
+      if (parallelism != null) {
+        builder = builder.withParallelism(parallelism);
+      }
+
+      DatadogIO.Write write = builder.build();
+
+      // Apply DatadogIO.Write
+      PCollection<DatadogWriteError> writeErrors = datadogEvents.apply("Write 
To Datadog", write);
+
+      // Handle errors
+      ErrorHandling errorHandling = configuration.getErrorHandling();
+      if (errorHandling != null) {
+        PCollection<Row> writeErrorRows =
+            writeErrors
+                .apply(
+                    "Convert Write Errors to Rows",
+                    org.apache.beam.sdk.transforms.MapElements.into(
+                            org.apache.beam.sdk.values.TypeDescriptors.rows())
+                        .via(
+                            error ->
+                                Row.withSchema(dynamicErrorSchema)
+                                    .addValue(null)
+                                    .addValue(error.payload())
+                                    .addValue(error.statusCode())
+                                    .addValue(error.statusMessage())
+                                    .build()))
+                
.setCoder(org.apache.beam.sdk.coders.RowCoder.of(dynamicErrorSchema));
+
+        PCollection<Row> allErrors =
+            org.apache.beam.sdk.values.PCollectionList.of(conversionErrors)
+                .and(writeErrorRows)
+                .apply("Flatten Errors", 
org.apache.beam.sdk.transforms.Flatten.pCollections())
+                
.setCoder(org.apache.beam.sdk.coders.RowCoder.of(dynamicErrorSchema));
+
+        return PCollectionRowTuple.of(errorHandling.getOutput(), allErrors);
+      } else {
+        writeErrors.apply("Fail on Write Error", ParDo.of(new 
FailOnWriteErrorFn()));
+        return PCollectionRowTuple.empty(input.getPipeline());

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Returning an empty `PCollectionRowTuple` when `errorHandling` is null is 
inconsistent with `outputCollectionNames()`, which advertises an `"errors"` 
output. This can cause runtime errors (e.g., "key not found") when downstream 
components or runners attempt to access the advertised output. Consider 
returning a tuple that includes an empty `PCollection` for the `ERROR` tag to 
maintain consistency with the provider's declared outputs.



##########
sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteSchemaTransformProvider.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.datadog;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+@AutoService(SchemaTransformProvider.class)
+public class DatadogWriteSchemaTransformProvider
+    extends 
TypedSchemaTransformProvider<DatadogWriteSchemaTransformConfiguration> {
+  private static final String IDENTIFIER = 
"beam:schematransform:org.apache.beam:datadog_write:v1";
+  static final String INPUT = "input";
+  static final String OUTPUT = "output";
+  static final String ERROR = "errors";
+  public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
+  public static final TupleTag<Void> OUTPUT_TAG = new TupleTag<Void>() {};
+  public static final TupleTag<DatadogEvent> EVENT_TAG = new 
TupleTag<DatadogEvent>() {};
+
+  @Override
+  protected Class<DatadogWriteSchemaTransformConfiguration> 
configurationClass() {
+    return DatadogWriteSchemaTransformConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(DatadogWriteSchemaTransformConfiguration 
configuration) {
+    return new DatadogWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} input 
collection names method. */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} output 
collection names method. */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(ERROR);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for Datadog Write jobs 
configured using {@link
+   * DatadogWriteSchemaTransformConfiguration}.
+   */
+  static class DatadogWriteSchemaTransform extends SchemaTransform {
+    private final DatadogWriteSchemaTransformConfiguration configuration;
+
+    DatadogWriteSchemaTransform(DatadogWriteSchemaTransformConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      // Validate configuration parameters
+      configuration.validate();
+
+      // Obtain input rows
+      PCollection<Row> inputRows = input.get(INPUT);
+
+      // Check for errors
+      boolean handleErrors = 
ErrorHandling.hasOutput(configuration.getErrorHandling());
+
+      Schema inputSchema = inputRows.getSchema();
+      Schema dynamicErrorSchema =
+          Schema.builder()
+              .addNullableRowField("failed_row", inputSchema)
+              .addNullableField("payload", Schema.FieldType.STRING)
+              .addNullableField("statusCode", Schema.FieldType.INT32)
+              .addNullableField("statusMessage", Schema.FieldType.STRING)
+              .build();
+
+      PCollectionTuple convertResult =
+          inputRows.apply(
+              "Convert to DatadogEvent",
+              ParDo.of(new RowToEventFn(handleErrors, ERROR_TAG, 
dynamicErrorSchema))
+                  .withOutputTags(EVENT_TAG, TupleTagList.of(ERROR_TAG)));
+
+      PCollection<DatadogEvent> datadogEvents =
+          convertResult.get(EVENT_TAG).setCoder(DatadogEventCoder.of());
+      PCollection<Row> conversionErrors =
+          convertResult
+              .get(ERROR_TAG)
+              
.setCoder(org.apache.beam.sdk.coders.RowCoder.of(dynamicErrorSchema));
+
+      // Configure DatadogIO.Write
+      DatadogIO.Write.Builder builder =
+          DatadogIO.writeBuilder(configuration.getMinBatchCount())
+              .withUrl(configuration.getUrl())
+              .withApiKey(configuration.getApiKey());
+
+      Integer batchCount = configuration.getBatchCount();
+      if (batchCount != null) {
+        builder = builder.withBatchCount(batchCount);
+      }
+      Long maxBufferSize = configuration.getMaxBufferSize();
+      if (maxBufferSize != null) {
+        builder = builder.withMaxBufferSize(maxBufferSize);
+      }
+      Integer parallelism = configuration.getParallelism();
+      if (parallelism != null) {
+        builder = builder.withParallelism(parallelism);
+      }
+
+      DatadogIO.Write write = builder.build();
+
+      // Apply DatadogIO.Write
+      PCollection<DatadogWriteError> writeErrors = datadogEvents.apply("Write 
To Datadog", write);
+
+      // Handle errors
+      ErrorHandling errorHandling = configuration.getErrorHandling();
+      if (errorHandling != null) {
+        PCollection<Row> writeErrorRows =
+            writeErrors
+                .apply(
+                    "Convert Write Errors to Rows",
+                    org.apache.beam.sdk.transforms.MapElements.into(
+                            org.apache.beam.sdk.values.TypeDescriptors.rows())
+                        .via(
+                            error ->
+                                Row.withSchema(dynamicErrorSchema)
+                                    .addValue(null)
+                                    .addValue(error.payload())
+                                    .addValue(error.statusCode())
+                                    .addValue(error.statusMessage())
+                                    .build()))
+                
.setCoder(org.apache.beam.sdk.coders.RowCoder.of(dynamicErrorSchema));
+
+        PCollection<Row> allErrors =
+            org.apache.beam.sdk.values.PCollectionList.of(conversionErrors)
+                .and(writeErrorRows)
+                .apply("Flatten Errors", 
org.apache.beam.sdk.transforms.Flatten.pCollections())
+                
.setCoder(org.apache.beam.sdk.coders.RowCoder.of(dynamicErrorSchema));
+
+        return PCollectionRowTuple.of(errorHandling.getOutput(), allErrors);

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The output tag used here (`errorHandling.getOutput()`) should match the name 
declared in `outputCollectionNames()`, which is currently fixed to `"errors"` 
(via the `ERROR` constant). Using a dynamic name from the configuration that 
differs from the advertised name can break discovery and cross-language 
compatibility, as the expansion service and runners expect the output keys to 
match the provider's contract. It is recommended to use the `ERROR` constant as 
the tag to ensure consistency.
   
   ```suggestion
           return PCollectionRowTuple.of(ERROR, allErrors);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to