[ 
https://issues.apache.org/jira/browse/BEAM-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17191337#comment-17191337
 ] 

Beam JIRA Bot commented on BEAM-9527:
-------------------------------------

This issue is assigned but has not received an update in 30 days so it has been 
labeled "stale-assigned". If you are still working on the issue, please give an 
update and remove the label. If you are no longer working on the issue, please 
unassign so someone else may work on it. In 7 days the issue will be 
automatically unassigned.

> apache_beam.runners.portability.fn_api_runner_test.FnApiRunnerSplitTest.test_split_crazy_sdf
>  is flaky
> -----------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-9527
>                 URL: https://issues.apache.org/jira/browse/BEAM-9527
>             Project: Beam
>          Issue Type: Bug
>          Components: test-failures
>            Reporter: Valentyn Tymofieiev
>            Assignee: Boyuan Zhang
>            Priority: P1
>              Labels: beam-fixit, flake, stale-assigned
>
> {noformat}
> self = <apache_beam.runners.portability.fn_api_runner.BundleManager object at 
> 0x7fe494edb450>
> split_manager = <function split_manager at 0x7fe4c2ff0c80>
> inputs = {'ref_PCollection_PCollection_3_split/Read': 
> ['\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x08V\xff\x80\x02capache_beam....\nOffsetRange\nq\x01)\x81q\x02}q\x03(U\x04stopq\x04K\x05U\x05startq\x05K\x00ub.\x01\x00@\x14\x00\x00\x00\x00\x00\x00']}
> process_bundle_id = 'bundle_2575'
>     def _generate_splits_for_testing(self,
>                                      split_manager,
>                                      inputs,  # type: Mapping[str, 
> PartitionableBuffer]
>                                      process_bundle_id):
>       # type: (...) -> List[beam_fn_api_pb2.ProcessBundleSplitResponse]
>       split_results = []  # type: 
> List[beam_fn_api_pb2.ProcessBundleSplitResponse]
>       read_transform_id, buffer_data = only_element(inputs.items())
>       byte_stream = b''.join(buffer_data)
>       num_elements = len(
>           list(
>               self._get_input_coder_impl(read_transform_id).decode_all(
>                   byte_stream)))
>     
>       # Start the split manager in case it wants to set any breakpoints.
>       split_manager_generator = split_manager(num_elements)
>       try:
>         split_fraction = next(split_manager_generator)
>         done = False
>       except StopIteration:
>         done = True
>     
>       # Send all the data.
>       self._send_input_to_worker(
>           process_bundle_id, read_transform_id, [byte_stream])
>     
>       assert self._worker_handler is not None
>     
>       # Execute the requested splits.
>       while not done:
>         if split_fraction is None:
>           split_result = None
>         else:
>           split_request = beam_fn_api_pb2.InstructionRequest(
>               process_bundle_split=beam_fn_api_pb2.ProcessBundleSplitRequest(
>                   instruction_id=process_bundle_id,
>                   desired_splits={
>                       read_transform_id: beam_fn_api_pb2.
>                       ProcessBundleSplitRequest.DesiredSplit(
>                           fraction_of_remainder=split_fraction,
>                           estimated_input_elements=num_elements)
>                   }))
>           split_response = self._worker_handler.control_conn.push(
>               split_request).get()  # type: 
> beam_fn_api_pb2.InstructionResponse
>           for t in (0.05, 0.1, 0.2):
>             waiting = ('Instruction not running', 'not yet scheduled')
>             if any(msg in split_response.error for msg in waiting):
>               time.sleep(t)
>               split_response = self._worker_handler.control_conn.push(
>                   split_request).get()
>           if 'Unknown process bundle' in split_response.error:
>             # It may have finished too fast.
>             split_result = None
>           elif split_response.error:
> >           raise RuntimeError(split_response.error)
> E           RuntimeError: Traceback (most recent call last):
> E             File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
>  line 190, in _execute
> E               response = task()
> E             File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
>  line 229, in <lambda>
> E               lambda: self.create_worker().do_instruction(request), request)
> E             File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
>  line 416, in do_instruction
> E               getattr(request, request_type), request.instruction_id)
> E             File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
>  line 479, in process_bundle_split
> E               process_bundle_split=processor.try_split(request))
> E             File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
>  line 882, in try_split
> E               desired_split.estimated_input_elements)
> E             File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
>  line 250, in try_split
> E               keep_of_element_remainder
> E             File "apache_beam/runners/worker/operations.py", line 202, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.try_split
> E               return self.consumer.try_split(fraction_of_remainder)
> E             File "apache_beam/runners/worker/operations.py", line 804, in 
> apache_beam.runners.worker.operations.SdfProcessSizedElements.try_split
> E               split = self.dofn_runner.try_split(fraction_of_remainder)
> E             File "apache_beam/runners/common.py", line 973, in 
> apache_beam.runners.common.DoFnRunner.try_split
> E               return self.do_fn_invoker.try_split(fraction)
> E             File "apache_beam/runners/common.py", line 839, in 
> apache_beam.runners.common.PerWindowInvoker.try_split
> E               self.threadsafe_watermark_estimator.current_watermark())
> E           AttributeError: 'NoneType' object has no attribute 
> 'current_watermark'
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to