This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f2d0dc8aecc Improve LogElements to show pane_info and timestamps in
seconds. (#35387)
f2d0dc8aecc is described below
commit f2d0dc8aecc116193c319773770c64f08f3ae833
Author: Shunping Huang <[email protected]>
AuthorDate: Mon Jun 23 16:11:32 2025 -0400
Improve LogElements to show pane_info and timestamps in seconds. (#35387)
* Improve LogElements to show pane_info and timestamps in seconds.
* Rename the new argument to use_epoch. Adjust argument order.
---
sdks/python/apache_beam/transforms/util.py | 42 ++++++++++++++++++----
sdks/python/apache_beam/transforms/util_test.py | 46 +++++++++++++++++++++++--
2 files changed, 79 insertions(+), 9 deletions(-)
diff --git a/sdks/python/apache_beam/transforms/util.py
b/sdks/python/apache_beam/transforms/util.py
index 08e111992ef..07d82290212 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -1445,30 +1445,49 @@ class LogElements(PTransform):
level: (optional) The logging level for the output (e.g. `logging.DEBUG`,
`logging.INFO`, `logging.WARNING`, `logging.ERROR`). If not specified,
the log is printed to stdout.
+ with_pane_info (bool): (optional) Whether to include element's pane info.
+ use_epoch_time (bool): (optional) Whether to display epoch timestamps.
"""
class _LoggingFn(DoFn):
def __init__(
- self, prefix='', with_timestamp=False, with_window=False, level=None):
+ self,
+ prefix='',
+ with_timestamp=False,
+ with_window=False,
+ level=None,
+ with_pane_info=False,
+ use_epoch_time=False):
super().__init__()
self.prefix = prefix
self.with_timestamp = with_timestamp
self.with_window = with_window
self.level = level
+ self.with_pane_info = with_pane_info
+ self.use_epoch_time = use_epoch_time
+
+ def format_timestamp(self, timestamp):
+ if self.use_epoch_time:
+ return timestamp.seconds()
+ return timestamp.to_rfc3339()
def process(
self,
element,
timestamp=DoFn.TimestampParam,
window=DoFn.WindowParam,
+ pane_info=DoFn.PaneInfoParam,
**kwargs):
log_line = self.prefix + str(element)
if self.with_timestamp:
- log_line += ', timestamp=' + repr(timestamp.to_rfc3339())
+ log_line += ', timestamp=' + repr(self.format_timestamp(timestamp))
if self.with_window:
- log_line += ', window(start=' + window.start.to_rfc3339()
- log_line += ', end=' + window.end.to_rfc3339() + ')'
+ log_line += ', window(start=' +
str(self.format_timestamp(window.start))
+ log_line += ', end=' + str(self.format_timestamp(window.end)) + ')'
+
+ if self.with_pane_info:
+ log_line += ', pane_info=' + repr(pane_info)
if self.level == logging.DEBUG:
logging.debug(log_line)
@@ -1491,17 +1510,28 @@ class LogElements(PTransform):
prefix='',
with_timestamp=False,
with_window=False,
- level=None):
+ level=None,
+ with_pane_info=False,
+ use_epoch_time=False,
+ ):
super().__init__(label)
self.prefix = prefix
self.with_timestamp = with_timestamp
self.with_window = with_window
+ self.with_pane_info = with_pane_info
+ self.use_epoch_time = use_epoch_time
self.level = level
def expand(self, input):
return input | ParDo(
self._LoggingFn(
- self.prefix, self.with_timestamp, self.with_window, self.level))
+ self.prefix,
+ self.with_timestamp,
+ self.with_window,
+ self.level,
+ self.with_pane_info,
+ self.use_epoch_time,
+ ))
class Reify(object):
diff --git a/sdks/python/apache_beam/transforms/util_test.py
b/sdks/python/apache_beam/transforms/util_test.py
index ac703dd53e5..a9bec0df973 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -1613,7 +1613,10 @@ class LogElementsTest(unittest.TestCase):
])
| beam.WindowInto(FixedWindows(60))
| util.LogElements(
- prefix='prefix_', with_window=True, with_timestamp=True))
+ prefix='prefix_',
+ with_window=True,
+ with_timestamp=True,
+ with_pane_info=True))
request.captured_stdout = capsys.readouterr().out
return result
@@ -1622,9 +1625,46 @@ class LogElementsTest(unittest.TestCase):
def test_stdout_logs(self):
assert self.captured_stdout == \
("prefix_event, timestamp='2022-10-01T00:00:00Z', "
- "window(start=2022-10-01T00:00:00Z, end=2022-10-01T00:01:00Z)\n"
+ "window(start=2022-10-01T00:00:00Z, end=2022-10-01T00:01:00Z), "
+ "pane_info=PaneInfo(first: True, last: True, timing: UNKNOWN, "
+ "index: 0, nonspeculative_index: 0)\n"
"prefix_event, timestamp='2022-10-02T00:00:00Z', "
- "window(start=2022-10-02T00:00:00Z, end=2022-10-02T00:01:00Z)\n"), \
+ "window(start=2022-10-02T00:00:00Z, end=2022-10-02T00:01:00Z), "
+ "pane_info=PaneInfo(first: True, last: True, timing: UNKNOWN, "
+ "index: 0, nonspeculative_index: 0)\n"), \
+ f'Received from stdout: {self.captured_stdout}'
+
+ @pytest.fixture(scope="function")
+ def _capture_stdout_log_without_rfc3339(request, capsys):
+ with TestPipeline() as p:
+ result = (
+ p | beam.Create([
+ TimestampedValue(
+ "event",
+ datetime(2022, 10, 1, 0, 0, 0, 0,
+ tzinfo=pytz.UTC).timestamp()),
+ TimestampedValue(
+ "event",
+ datetime(2022, 10, 2, 0, 0, 0, 0,
+ tzinfo=pytz.UTC).timestamp()),
+ ])
+ | beam.WindowInto(FixedWindows(60))
+ | util.LogElements(
+ prefix='prefix_',
+ with_window=True,
+ with_timestamp=True,
+ use_epoch_time=True))
+
+ request.captured_stdout = capsys.readouterr().out
+ return result
+
+ @pytest.mark.usefixtures("_capture_stdout_log_without_rfc3339")
+ def test_stdout_logs_without_rfc3339(self):
+ assert self.captured_stdout == \
+ ("prefix_event, timestamp=1664582400, "
+ "window(start=1664582400, end=1664582460)\n"
+ "prefix_event, timestamp=1664668800, "
+ "window(start=1664668800, end=1664668860)\n"), \
f'Received from stdout: {self.captured_stdout}'
def test_ptransform_output(self):