This is an automated email from the ASF dual-hosted git repository.

ahmedabu98 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a99f94bd79 feat: Add LogElements transform to Java SDK (#38533)
3a99f94bd79 is described below

commit 3a99f94bd79ce669ca52eede7c81c271cff8d154
Author: Lalit Yadav <[email protected]>
AuthorDate: Thu Jun 4 11:02:41 2026 -0500

    feat: Add LogElements transform to Java SDK (#38533)
    
    * feat: Add Java LogElements transform
    
    * Addressed LogElements review feedback
    
    * Add LogElements tests
    
    * Add LogElements level logging test
    
    * Refactor LogElements logging test
---
 .../apache/beam/sdk/transforms/LogElements.java    | 234 +++++++++++++++++++++
 .../beam/sdk/transforms/LogElementsTest.java       | 108 ++++++++++
 2 files changed, 342 insertions(+)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java
new file mode 100644
index 00000000000..75b41d2b044
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.Objects;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+/**
+ * {@link PTransform} for logging elements of a {@link PCollection}.
+ *
+ * <p>Each element is logged and then emitted unchanged.
+ *
+ * <pre>{@code
+ * PCollection<String> words = ...;
+ * PCollection<String> loggedWords = 
words.apply(LogElements.info().withPrefix("word: "));
+ * }</pre>
+ *
+ * @param <T> the element type of the input {@link PCollection}
+ */
+public class LogElements<T> extends PTransform<PCollection<T>, PCollection<T>> 
{
+  private static final Logger LOG = LoggerFactory.getLogger(LogElements.class);
+
+  private final Level level;
+  private final String prefix;
+  private final boolean withTimestamp;
+  private final boolean withWindow;
+  private final boolean withPaneInfo;
+
+  /** Returns a {@link LogElements} transform that logs elements at {@link 
Level#TRACE}. */
+  public static <T> LogElements<T> trace() {
+    return of(Level.TRACE);
+  }
+
+  /** Returns a {@link LogElements} transform that logs elements at {@link 
Level#DEBUG}. */
+  public static <T> LogElements<T> debug() {
+    return of(Level.DEBUG);
+  }
+
+  /** Returns a {@link LogElements} transform that logs elements at {@link 
Level#INFO}. */
+  public static <T> LogElements<T> info() {
+    return of(Level.INFO);
+  }
+
+  /** Returns a {@link LogElements} transform that logs elements at {@link 
Level#WARN}. */
+  public static <T> LogElements<T> warn() {
+    return of(Level.WARN);
+  }
+
+  /** Returns a {@link LogElements} transform that logs elements at {@link 
Level#ERROR}. */
+  public static <T> LogElements<T> error() {
+    return of(Level.ERROR);
+  }
+
+  /** Returns a {@link LogElements} transform that logs elements at the given 
level. */
+  public static <T> LogElements<T> of(Level level) {
+    return new LogElements<>(level, "", false, false, false);
+  }
+
+  private LogElements(
+      Level level, String prefix, boolean withTimestamp, boolean withWindow, 
boolean withPaneInfo) {
+    this.level = Objects.requireNonNull(level, "level");
+    this.prefix = Objects.requireNonNull(prefix, "prefix");
+    this.withTimestamp = withTimestamp;
+    this.withWindow = withWindow;
+    this.withPaneInfo = withPaneInfo;
+  }
+
+  /** Returns a new {@link LogElements} transform with the given prefix before 
each element. */
+  public LogElements<T> withPrefix(String prefix) {
+    return new LogElements<>(level, prefix, withTimestamp, withWindow, 
withPaneInfo);
+  }
+
+  /** Returns a new {@link LogElements} transform that logs each element's 
timestamp. */
+  public LogElements<T> withTimestamp() {
+    return new LogElements<>(level, prefix, true, withWindow, withPaneInfo);
+  }
+
+  /** Returns a new {@link LogElements} transform that logs each element's 
window. */
+  public LogElements<T> withWindow() {
+    return new LogElements<>(level, prefix, withTimestamp, true, withPaneInfo);
+  }
+
+  /** Returns a new {@link LogElements} transform that logs each element's 
pane info. */
+  public LogElements<T> withPaneInfo() {
+    return new LogElements<>(level, prefix, withTimestamp, withWindow, true);
+  }
+
+  @Override
+  public PCollection<T> expand(PCollection<T> input) {
+    return input.apply("Log", ParDo.of(new LoggingFn<>(this)));
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder
+        .add(DisplayData.item("level", level.name()).withLabel("Log Level"))
+        .addIfNotDefault(DisplayData.item("prefix", 
prefix).withLabel("Prefix"), "")
+        .addIfNotDefault(
+            DisplayData.item("withTimestamp", withTimestamp).withLabel("Log 
Timestamp"), false)
+        .addIfNotDefault(DisplayData.item("withWindow", 
withWindow).withLabel("Log Window"), false)
+        .addIfNotDefault(
+            DisplayData.item("withPaneInfo", withPaneInfo).withLabel("Log Pane 
Info"), false);
+  }
+
+  static String formatForLogging(
+      @Nullable Object element,
+      String prefix,
+      boolean withTimestamp,
+      boolean withWindow,
+      boolean withPaneInfo,
+      Instant timestamp,
+      BoundedWindow window,
+      PaneInfo paneInfo) {
+    StringBuilder builder = new StringBuilder(prefix).append(element);
+    if (withTimestamp) {
+      builder.append(", timestamp=").append(timestamp);
+    }
+    if (withWindow) {
+      builder.append(", window=").append(window);
+    }
+    if (withPaneInfo) {
+      builder.append(", paneInfo=").append(paneInfo);
+    }
+    return builder.toString();
+  }
+
+  @VisibleForTesting
+  static void log(Level level, String message) {
+    switch (level) {
+      case TRACE:
+        LOG.trace("{}", message);
+        break;
+      case DEBUG:
+        LOG.debug("{}", message);
+        break;
+      case INFO:
+        LOG.info("{}", message);
+        break;
+      case WARN:
+        LOG.warn("{}", message);
+        break;
+      case ERROR:
+        LOG.error("{}", message);
+        break;
+      default:
+        throw unsupportedLogLevel(level);
+    }
+  }
+
+  private static boolean isLoggingEnabled(Level level) {
+    switch (level) {
+      case TRACE:
+        return LOG.isTraceEnabled();
+      case DEBUG:
+        return LOG.isDebugEnabled();
+      case INFO:
+        return LOG.isInfoEnabled();
+      case WARN:
+        return LOG.isWarnEnabled();
+      case ERROR:
+        return LOG.isErrorEnabled();
+      default:
+        throw unsupportedLogLevel(level);
+    }
+  }
+
+  private static IllegalArgumentException unsupportedLogLevel(Level level) {
+    return new IllegalArgumentException("Unsupported log level: " + level);
+  }
+
+  private static class LoggingFn<T> extends DoFn<T, T> {
+    private final Level level;
+    private final String prefix;
+    private final boolean withTimestamp;
+    private final boolean withWindow;
+    private final boolean withPaneInfo;
+
+    private LoggingFn(LogElements<T> transform) {
+      this.level = transform.level;
+      this.prefix = transform.prefix;
+      this.withTimestamp = transform.withTimestamp;
+      this.withWindow = transform.withWindow;
+      this.withPaneInfo = transform.withPaneInfo;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element T element,
+        @DoFn.Timestamp Instant timestamp,
+        BoundedWindow window,
+        PaneInfo paneInfo,
+        OutputReceiver<T> receiver) {
+      if (isLoggingEnabled(level)) {
+        log(
+            level,
+            formatForLogging(
+                element,
+                prefix,
+                withTimestamp,
+                withWindow,
+                withPaneInfo,
+                timestamp,
+                window,
+                paneInfo));
+      }
+      receiver.output(element);
+    }
+  }
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java
new file mode 100644
index 00000000000..4d5409e362f
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.event.Level;
+
+/** Tests for {@link LogElements}. */
+@RunWith(JUnit4.class)
+public class LogElementsTest {
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedLogs expectedLogs = 
ExpectedLogs.none(LogElements.class);
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testLogElementsPreservesElements() {
+    List<String> elements = Arrays.asList("a", "b", "c");
+
+    PCollection<String> output =
+        pipeline.apply(Create.of(elements)).apply(LogElements.<String>info());
+
+    PAssert.that(output).containsInAnyOrder(elements);
+    pipeline.run();
+  }
+
+  @Test
+  public void testFormatForLoggingIncludesConfiguredMetadata() {
+    Instant timestamp = new Instant(0);
+    IntervalWindow window = new IntervalWindow(timestamp, 
Duration.standardMinutes(1));
+
+    String message =
+        LogElements.formatForLogging(
+            "a", "row: ", true, true, true, timestamp, window, 
PaneInfo.NO_FIRING);
+
+    assertThat(message, containsString("row: a"));
+    assertThat(message, containsString("timestamp=1970-01-01T00:00:00.000Z"));
+    assertThat(
+        message, 
containsString("window=[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)"));
+    assertThat(message, containsString("paneInfo=PaneInfo.NO_FIRING"));
+  }
+
+  @Test
+  public void testLogElementsLogsAtConfiguredLevels() {
+    LogElements.log(Level.TRACE, "trace: trace-element");
+    LogElements.log(Level.DEBUG, "debug: debug-element");
+    LogElements.log(Level.INFO, "info: info-element");
+    LogElements.log(Level.WARN, "warn: warn-element");
+    LogElements.log(Level.ERROR, "error: error-element");
+
+    expectedLogs.verifyTrace("trace: trace-element");
+    expectedLogs.verifyDebug("debug: debug-element");
+    expectedLogs.verifyInfo("info: info-element");
+    expectedLogs.verifyWarn("warn: warn-element");
+    expectedLogs.verifyError("error: error-element");
+  }
+
+  @Test
+  public void testDisplayData() {
+    DisplayData displayData =
+        DisplayData.from(
+            LogElements.of(Level.WARN)
+                .withPrefix("row: ")
+                .withTimestamp()
+                .withWindow()
+                .withPaneInfo());
+
+    assertThat(displayData, hasDisplayItem("level", "WARN"));
+    assertThat(displayData, hasDisplayItem("prefix", "row: "));
+    assertThat(displayData, hasDisplayItem("withTimestamp", true));
+    assertThat(displayData, hasDisplayItem("withWindow", true));
+    assertThat(displayData, hasDisplayItem("withPaneInfo", true));
+  }
+}

Reply via email to