This is an automated email from the ASF dual-hosted git repository.
ahmedabu98 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3a99f94bd79 feat: Add LogElements transform to Java SDK (#38533)
3a99f94bd79 is described below
commit 3a99f94bd79ce669ca52eede7c81c271cff8d154
Author: Lalit Yadav <[email protected]>
AuthorDate: Thu Jun 4 11:02:41 2026 -0500
feat: Add LogElements transform to Java SDK (#38533)
* feat: Add Java LogElements transform
* Addressed LogElements review feedback
* Add LogElements tests
* Add LogElements level logging test
* Refactor LogElements logging test
---
.../apache/beam/sdk/transforms/LogElements.java | 234 +++++++++++++++++++++
.../beam/sdk/transforms/LogElementsTest.java | 108 ++++++++++
2 files changed, 342 insertions(+)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java
new file mode 100644
index 00000000000..75b41d2b044
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.Objects;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.PCollection;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+/**
+ * {@link PTransform} for logging elements of a {@link PCollection}.
+ *
+ * <p>Each element is logged and then emitted unchanged.
+ *
+ * <pre>{@code
+ * PCollection<String> words = ...;
+ * PCollection<String> loggedWords =
words.apply(LogElements.info().withPrefix("word: "));
+ * }</pre>
+ *
+ * @param <T> the element type of the input {@link PCollection}
+ */
+public class LogElements<T> extends PTransform<PCollection<T>, PCollection<T>>
{
+ private static final Logger LOG = LoggerFactory.getLogger(LogElements.class);
+
+ private final Level level;
+ private final String prefix;
+ private final boolean withTimestamp;
+ private final boolean withWindow;
+ private final boolean withPaneInfo;
+
+ /** Returns a {@link LogElements} transform that logs elements at {@link
Level#TRACE}. */
+ public static <T> LogElements<T> trace() {
+ return of(Level.TRACE);
+ }
+
+ /** Returns a {@link LogElements} transform that logs elements at {@link
Level#DEBUG}. */
+ public static <T> LogElements<T> debug() {
+ return of(Level.DEBUG);
+ }
+
+ /** Returns a {@link LogElements} transform that logs elements at {@link
Level#INFO}. */
+ public static <T> LogElements<T> info() {
+ return of(Level.INFO);
+ }
+
+ /** Returns a {@link LogElements} transform that logs elements at {@link
Level#WARN}. */
+ public static <T> LogElements<T> warn() {
+ return of(Level.WARN);
+ }
+
+ /** Returns a {@link LogElements} transform that logs elements at {@link
Level#ERROR}. */
+ public static <T> LogElements<T> error() {
+ return of(Level.ERROR);
+ }
+
+ /** Returns a {@link LogElements} transform that logs elements at the given
level. */
+ public static <T> LogElements<T> of(Level level) {
+ return new LogElements<>(level, "", false, false, false);
+ }
+
+ private LogElements(
+ Level level, String prefix, boolean withTimestamp, boolean withWindow,
boolean withPaneInfo) {
+ this.level = Objects.requireNonNull(level, "level");
+ this.prefix = Objects.requireNonNull(prefix, "prefix");
+ this.withTimestamp = withTimestamp;
+ this.withWindow = withWindow;
+ this.withPaneInfo = withPaneInfo;
+ }
+
+ /** Returns a new {@link LogElements} transform with the given prefix before
each element. */
+ public LogElements<T> withPrefix(String prefix) {
+ return new LogElements<>(level, prefix, withTimestamp, withWindow,
withPaneInfo);
+ }
+
+ /** Returns a new {@link LogElements} transform that logs each element's
timestamp. */
+ public LogElements<T> withTimestamp() {
+ return new LogElements<>(level, prefix, true, withWindow, withPaneInfo);
+ }
+
+ /** Returns a new {@link LogElements} transform that logs each element's
window. */
+ public LogElements<T> withWindow() {
+ return new LogElements<>(level, prefix, withTimestamp, true, withPaneInfo);
+ }
+
+ /** Returns a new {@link LogElements} transform that logs each element's
pane info. */
+ public LogElements<T> withPaneInfo() {
+ return new LogElements<>(level, prefix, withTimestamp, withWindow, true);
+ }
+
+ @Override
+ public PCollection<T> expand(PCollection<T> input) {
+ return input.apply("Log", ParDo.of(new LoggingFn<>(this)));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .add(DisplayData.item("level", level.name()).withLabel("Log Level"))
+ .addIfNotDefault(DisplayData.item("prefix",
prefix).withLabel("Prefix"), "")
+ .addIfNotDefault(
+ DisplayData.item("withTimestamp", withTimestamp).withLabel("Log
Timestamp"), false)
+ .addIfNotDefault(DisplayData.item("withWindow",
withWindow).withLabel("Log Window"), false)
+ .addIfNotDefault(
+ DisplayData.item("withPaneInfo", withPaneInfo).withLabel("Log Pane
Info"), false);
+ }
+
+ static String formatForLogging(
+ @Nullable Object element,
+ String prefix,
+ boolean withTimestamp,
+ boolean withWindow,
+ boolean withPaneInfo,
+ Instant timestamp,
+ BoundedWindow window,
+ PaneInfo paneInfo) {
+ StringBuilder builder = new StringBuilder(prefix).append(element);
+ if (withTimestamp) {
+ builder.append(", timestamp=").append(timestamp);
+ }
+ if (withWindow) {
+ builder.append(", window=").append(window);
+ }
+ if (withPaneInfo) {
+ builder.append(", paneInfo=").append(paneInfo);
+ }
+ return builder.toString();
+ }
+
+ @VisibleForTesting
+ static void log(Level level, String message) {
+ switch (level) {
+ case TRACE:
+ LOG.trace("{}", message);
+ break;
+ case DEBUG:
+ LOG.debug("{}", message);
+ break;
+ case INFO:
+ LOG.info("{}", message);
+ break;
+ case WARN:
+ LOG.warn("{}", message);
+ break;
+ case ERROR:
+ LOG.error("{}", message);
+ break;
+ default:
+ throw unsupportedLogLevel(level);
+ }
+ }
+
+ private static boolean isLoggingEnabled(Level level) {
+ switch (level) {
+ case TRACE:
+ return LOG.isTraceEnabled();
+ case DEBUG:
+ return LOG.isDebugEnabled();
+ case INFO:
+ return LOG.isInfoEnabled();
+ case WARN:
+ return LOG.isWarnEnabled();
+ case ERROR:
+ return LOG.isErrorEnabled();
+ default:
+ throw unsupportedLogLevel(level);
+ }
+ }
+
+ private static IllegalArgumentException unsupportedLogLevel(Level level) {
+ return new IllegalArgumentException("Unsupported log level: " + level);
+ }
+
+ private static class LoggingFn<T> extends DoFn<T, T> {
+ private final Level level;
+ private final String prefix;
+ private final boolean withTimestamp;
+ private final boolean withWindow;
+ private final boolean withPaneInfo;
+
+ private LoggingFn(LogElements<T> transform) {
+ this.level = transform.level;
+ this.prefix = transform.prefix;
+ this.withTimestamp = transform.withTimestamp;
+ this.withWindow = transform.withWindow;
+ this.withPaneInfo = transform.withPaneInfo;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element T element,
+ @DoFn.Timestamp Instant timestamp,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ OutputReceiver<T> receiver) {
+ if (isLoggingEnabled(level)) {
+ log(
+ level,
+ formatForLogging(
+ element,
+ prefix,
+ withTimestamp,
+ withWindow,
+ withPaneInfo,
+ timestamp,
+ window,
+ paneInfo));
+ }
+ receiver.output(element);
+ }
+ }
+}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java
new file mode 100644
index 00000000000..4d5409e362f
--- /dev/null
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.event.Level;
+
+/** Tests for {@link LogElements}. */
+@RunWith(JUnit4.class)
+public class LogElementsTest {
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+ @Rule public ExpectedLogs expectedLogs =
ExpectedLogs.none(LogElements.class);
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testLogElementsPreservesElements() {
+ List<String> elements = Arrays.asList("a", "b", "c");
+
+ PCollection<String> output =
+ pipeline.apply(Create.of(elements)).apply(LogElements.<String>info());
+
+ PAssert.that(output).containsInAnyOrder(elements);
+ pipeline.run();
+ }
+
+ @Test
+ public void testFormatForLoggingIncludesConfiguredMetadata() {
+ Instant timestamp = new Instant(0);
+ IntervalWindow window = new IntervalWindow(timestamp,
Duration.standardMinutes(1));
+
+ String message =
+ LogElements.formatForLogging(
+ "a", "row: ", true, true, true, timestamp, window,
PaneInfo.NO_FIRING);
+
+ assertThat(message, containsString("row: a"));
+ assertThat(message, containsString("timestamp=1970-01-01T00:00:00.000Z"));
+ assertThat(
+ message,
containsString("window=[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)"));
+ assertThat(message, containsString("paneInfo=PaneInfo.NO_FIRING"));
+ }
+
+ @Test
+ public void testLogElementsLogsAtConfiguredLevels() {
+ LogElements.log(Level.TRACE, "trace: trace-element");
+ LogElements.log(Level.DEBUG, "debug: debug-element");
+ LogElements.log(Level.INFO, "info: info-element");
+ LogElements.log(Level.WARN, "warn: warn-element");
+ LogElements.log(Level.ERROR, "error: error-element");
+
+ expectedLogs.verifyTrace("trace: trace-element");
+ expectedLogs.verifyDebug("debug: debug-element");
+ expectedLogs.verifyInfo("info: info-element");
+ expectedLogs.verifyWarn("warn: warn-element");
+ expectedLogs.verifyError("error: error-element");
+ }
+
+ @Test
+ public void testDisplayData() {
+ DisplayData displayData =
+ DisplayData.from(
+ LogElements.of(Level.WARN)
+ .withPrefix("row: ")
+ .withTimestamp()
+ .withWindow()
+ .withPaneInfo());
+
+ assertThat(displayData, hasDisplayItem("level", "WARN"));
+ assertThat(displayData, hasDisplayItem("prefix", "row: "));
+ assertThat(displayData, hasDisplayItem("withTimestamp", true));
+ assertThat(displayData, hasDisplayItem("withWindow", true));
+ assertThat(displayData, hasDisplayItem("withPaneInfo", true));
+ }
+}