y1chi commented on a change in pull request #11916:
URL: https://github.com/apache/beam/pull/11916#discussion_r438941603



##########
File path: sdks/python/apache_beam/runners/direct/direct_userstate.py
##########
@@ -25,6 +25,7 @@
 from apache_beam.transforms import userstate
 from apache_beam.transforms.trigger import _ListStateTag
 from apache_beam.transforms.trigger import _SetStateTag
+from apache_beam.transforms.trigger import _ValueStateTag

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/transforms/userstate_test.py
##########
@@ -452,6 +459,40 @@ def clear_values(self, 
bag_state=beam.DoFn.StateParam(BAG_STATE)):
 
     self.assertEqual(['extra'], StatefulDoFnOnDirectRunnerTest.all_records)
 
+  def test_simple_read_modify_write_stateful_dofn(self):
+    class SimpleTestReadModifyWriteStatefulDoFn(DoFn):
+      VALUE_STATE = ReadModifyWriteStateSpec('value', StrUtf8Coder())
+
+      def process(self, element, last_element=DoFn.StateParam(VALUE_STATE)):
+        last_element.write('%s:%s' % element)
+        yield last_element.read()
+
+    with TestPipeline() as p:
+      (
+          p | beam.Create([('a', 1), ('b', 3), ('c', 5)])
+          | beam.ParDo(SimpleTestReadModifyWriteStatefulDoFn())
+          | beam.ParDo(self.record_dofn()))
+    self.assertEqual(['a:1', 'b:3', 'c:5'],
+                     StatefulDoFnOnDirectRunnerTest.all_records)
+
+  def test_clearing_read_modify_write_state(self):
+    class SimpleClearingReadModifyWriteStatefulDoFn(DoFn):
+      VALUE_STATE = ReadModifyWriteStateSpec('value', VarIntCoder())
+
+      def process(self, element, last_element=DoFn.StateParam(VALUE_STATE)):
+        last_element.write(element[1])

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to