This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f2d0dc8aecc Improve LogElements to show pane_info and timestamps in 
seconds. (#35387)
f2d0dc8aecc is described below

commit f2d0dc8aecc116193c319773770c64f08f3ae833
Author: Shunping Huang <[email protected]>
AuthorDate: Mon Jun 23 16:11:32 2025 -0400

    Improve LogElements to show pane_info and timestamps in seconds. (#35387)
    
    * Improve LogElements to show pane_info and timestamps in seconds.
    
    * Rename the new argument to use_epoch. Adjust argument order.
---
 sdks/python/apache_beam/transforms/util.py      | 42 ++++++++++++++++++----
 sdks/python/apache_beam/transforms/util_test.py | 46 +++++++++++++++++++++++--
 2 files changed, 79 insertions(+), 9 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/util.py 
b/sdks/python/apache_beam/transforms/util.py
index 08e111992ef..07d82290212 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -1445,30 +1445,49 @@ class LogElements(PTransform):
     level: (optional) The logging level for the output (e.g. `logging.DEBUG`,
         `logging.INFO`, `logging.WARNING`, `logging.ERROR`). If not specified,
         the log is printed to stdout.
+    with_pane_info (bool): (optional) Whether to include element's pane info.
+    use_epoch_time (bool): (optional) Whether to display epoch timestamps.
   """
   class _LoggingFn(DoFn):
     def __init__(
-        self, prefix='', with_timestamp=False, with_window=False, level=None):
+        self,
+        prefix='',
+        with_timestamp=False,
+        with_window=False,
+        level=None,
+        with_pane_info=False,
+        use_epoch_time=False):
       super().__init__()
       self.prefix = prefix
       self.with_timestamp = with_timestamp
       self.with_window = with_window
       self.level = level
+      self.with_pane_info = with_pane_info
+      self.use_epoch_time = use_epoch_time
+
+    def format_timestamp(self, timestamp):
+      if self.use_epoch_time:
+        return timestamp.seconds()
+      return timestamp.to_rfc3339()
 
     def process(
         self,
         element,
         timestamp=DoFn.TimestampParam,
         window=DoFn.WindowParam,
+        pane_info=DoFn.PaneInfoParam,
         **kwargs):
       log_line = self.prefix + str(element)
 
       if self.with_timestamp:
-        log_line += ', timestamp=' + repr(timestamp.to_rfc3339())
+        log_line += ', timestamp=' + repr(self.format_timestamp(timestamp))
 
       if self.with_window:
-        log_line += ', window(start=' + window.start.to_rfc3339()
-        log_line += ', end=' + window.end.to_rfc3339() + ')'
+        log_line += ', window(start=' + 
str(self.format_timestamp(window.start))
+        log_line += ', end=' + str(self.format_timestamp(window.end)) + ')'
+
+      if self.with_pane_info:
+        log_line += ', pane_info=' + repr(pane_info)
 
       if self.level == logging.DEBUG:
         logging.debug(log_line)
@@ -1491,17 +1510,28 @@ class LogElements(PTransform):
       prefix='',
       with_timestamp=False,
       with_window=False,
-      level=None):
+      level=None,
+      with_pane_info=False,
+      use_epoch_time=False,
+  ):
     super().__init__(label)
     self.prefix = prefix
     self.with_timestamp = with_timestamp
     self.with_window = with_window
+    self.with_pane_info = with_pane_info
+    self.use_epoch_time = use_epoch_time
     self.level = level
 
   def expand(self, input):
     return input | ParDo(
         self._LoggingFn(
-            self.prefix, self.with_timestamp, self.with_window, self.level))
+            self.prefix,
+            self.with_timestamp,
+            self.with_window,
+            self.level,
+            self.with_pane_info,
+            self.use_epoch_time,
+        ))
 
 
 class Reify(object):
diff --git a/sdks/python/apache_beam/transforms/util_test.py 
b/sdks/python/apache_beam/transforms/util_test.py
index ac703dd53e5..a9bec0df973 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -1613,7 +1613,10 @@ class LogElementsTest(unittest.TestCase):
           ])
           | beam.WindowInto(FixedWindows(60))
           | util.LogElements(
-              prefix='prefix_', with_window=True, with_timestamp=True))
+              prefix='prefix_',
+              with_window=True,
+              with_timestamp=True,
+              with_pane_info=True))
 
     request.captured_stdout = capsys.readouterr().out
     return result
@@ -1622,9 +1625,46 @@ class LogElementsTest(unittest.TestCase):
   def test_stdout_logs(self):
     assert self.captured_stdout == \
       ("prefix_event, timestamp='2022-10-01T00:00:00Z', "
-       "window(start=2022-10-01T00:00:00Z, end=2022-10-01T00:01:00Z)\n"
+       "window(start=2022-10-01T00:00:00Z, end=2022-10-01T00:01:00Z), "
+       "pane_info=PaneInfo(first: True, last: True, timing: UNKNOWN, "
+       "index: 0, nonspeculative_index: 0)\n"
        "prefix_event, timestamp='2022-10-02T00:00:00Z', "
-       "window(start=2022-10-02T00:00:00Z, end=2022-10-02T00:01:00Z)\n"), \
+       "window(start=2022-10-02T00:00:00Z, end=2022-10-02T00:01:00Z), "
+       "pane_info=PaneInfo(first: True, last: True, timing: UNKNOWN, "
+       "index: 0, nonspeculative_index: 0)\n"), \
+      f'Received from stdout: {self.captured_stdout}'
+
+  @pytest.fixture(scope="function")
+  def _capture_stdout_log_without_rfc3339(request, capsys):
+    with TestPipeline() as p:
+      result = (
+          p | beam.Create([
+              TimestampedValue(
+                  "event",
+                  datetime(2022, 10, 1, 0, 0, 0, 0,
+                           tzinfo=pytz.UTC).timestamp()),
+              TimestampedValue(
+                  "event",
+                  datetime(2022, 10, 2, 0, 0, 0, 0,
+                           tzinfo=pytz.UTC).timestamp()),
+          ])
+          | beam.WindowInto(FixedWindows(60))
+          | util.LogElements(
+              prefix='prefix_',
+              with_window=True,
+              with_timestamp=True,
+              use_epoch_time=True))
+
+    request.captured_stdout = capsys.readouterr().out
+    return result
+
+  @pytest.mark.usefixtures("_capture_stdout_log_without_rfc3339")
+  def test_stdout_logs_without_rfc3339(self):
+    assert self.captured_stdout == \
+      ("prefix_event, timestamp=1664582400, "
+       "window(start=1664582400, end=1664582460)\n"
+       "prefix_event, timestamp=1664668800, "
+       "window(start=1664668800, end=1664668860)\n"), \
       f'Received from stdout: {self.captured_stdout}'
 
   def test_ptransform_output(self):

Reply via email to