[
https://issues.apache.org/jira/browse/BEAM-562?focusedWorklogId=229962&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-229962
]
ASF GitHub Bot logged work on BEAM-562:
---------------------------------------
Author: ASF GitHub Bot
Created on: 19/Apr/19 00:21
Start Date: 19/Apr/19 00:21
Worklog Time Spent: 10m
Work Description: yifanmai commented on pull request #7994: [BEAM-562]
Add DoFn.setup and DoFn.teardown to Python SDK
URL: https://github.com/apache/beam/pull/7994#discussion_r276873522
##########
File path: sdks/python/apache_beam/transforms/dofn_lifecycle_test.py
##########
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""UnitTests for DoFn lifecycle and bundle methods"""
+
+from __future__ import absolute_import
+
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+
+class CallSequenceEnforcingDoFn(beam.DoFn):
+ def __init__(self):
+ self._setup_called = False
+ self._start_bundle_calls = 0
+ self._finish_bundle_calls = 0
+ self._teardown_called = False
+
+ def setup(self):
+ # Use assert instead of unittest.TestCase.assert* methods because
+ # this is not a TestCase
+ assert not self._setup_called,'setup should not be called twice'
+ assert self._start_bundle_calls == 0, 'setup should be called before
start_bundle'
+ assert self._finish_bundle_calls == 0, 'setup should be called before
finish_bundle'
+ assert not self._teardown_called, 'setup should be called before teardown'
+ self._setup_called = True
+
+ def start_bundle(self):
+ # Use assert instead of unittest.TestCase.assert* methods because
+ # this is not a TestCase
+ assert self._setup_called, 'setup should have been called'
+ assert self._start_bundle_calls == self._finish_bundle_calls , \
+ 'there should be as many start_bundle calls as finish_bundle calls'
+ assert not self._teardown_called, 'teardown should not have been called'
+ self._start_bundle_calls += 1
+
+ def process(self, element):
+ # Use assert instead of unittest.TestCase.assert* methods because
+ # this is not a TestCase
+ assert self._setup_called, 'setup should have been called'
+ assert self._start_bundle_calls > 0, 'start_bundle should have been called'
+ assert self._start_bundle_calls == self._finish_bundle_calls + 1, \
+ 'there should be one start_bundle call with no call to finish_bundle'
+ assert not self._teardown_called, 'teardown should not have been called'
+ return [element * element]
+
+ def finish_bundle(self):
+ # Use assert instead of unittest.TestCase.assert* methods because
+ # this is not a TestCase
+ assert self._setup_called, 'setup should have been called'
+ assert self._start_bundle_calls > 0, 'start_bundle should have been called'
+ assert self._start_bundle_calls == self._finish_bundle_calls + 1, \
+ 'there should be one start_bundle call with no call to finish_bundle'
+ assert not self._teardown_called, 'teardown should not have been called'
+ self._finish_bundle_calls += 1
+
+ def teardown(self):
+ # Use assert instead of unittest.TestCase.assert* methods because
+ # this is not a TestCase
+ assert self._setup_called, 'setup should have been called'
+ assert self._start_bundle_calls == self._finish_bundle_calls , \
+ 'there should be as many start_bundle calls as finish_bundle calls'
+ assert not self._teardown_called,'teardown should not be called twice'
+ # Unfortunately there is no way to assert that teardown was actually
called.
+ # This is acceptable because teardown is best-effort.
+ self._teardown_called = True
+
+
+@attr('ValidatesRunner')
+class DoFnLifecycleTest(unittest.TestCase):
+ def test_dofn_lifecycle(self):
+ with TestPipeline() as p:
+ (p
+ | 'Start' >> beam.Create([1, 2, 3])
+ | 'Do' >> beam.ParDo(CallSequenceEnforcingDoFn()))
Review comment:
Would that actually work? My understanding is that with remote runners, the
DoFn instance here and the one on the worker are different physical instances,
and there isn't a way to check for state changes on the DoFn on the worker to
figure out if teardown was called or not.
Also, when I was doing print debugging, it looks like teardown is not being
called on the direct runner.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 229962)
Time Spent: 4.5h (was: 4h 20m)
> DoFn Reuse: Add new methods to DoFn
> -----------------------------------
>
> Key: BEAM-562
> URL: https://issues.apache.org/jira/browse/BEAM-562
> Project: Beam
> Issue Type: New Feature
> Components: sdk-py-core
> Reporter: Ahmet Altay
> Priority: Major
> Labels: sdk-consistency, triaged
> Time Spent: 4.5h
> Remaining Estimate: 0h
>
> Java SDK added setup and teardown methods to the DoFns. This makes DoFns
> reusable and provide performance improvements. Python SDK should add support
> for these new DoFn methods:
> Proposal doc:
> https://docs.google.com/document/d/1LLQqggSePURt3XavKBGV7SZJYQ4NW8yCu63lBchzMRk/edit?ts=5771458f#
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)