BjornPrime commented on issue #24313:
URL: https://github.com/apache/beam/issues/24313#issuecomment-1332571725

   Full job output can be found here: 
https://github.com/apache/beam/actions/runs/3411967870/jobs/5676856627
   
   Also see fuller output on the test failure, which was truncated in the 
initial post:
   ```
   ================================== FAILURES 
===================================
   __ PortableRunnerTestWithSubprocesses.test_pardo_state_with_custom_key_coder 
__
   [gw3] win32 -- Python 3.7.9 
D:\a\beam\beam\sdks\python\target\.tox\py37-win\Scripts\python.exe
   
   self = 
<apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
 testMethod=test_pardo_state_with_custom_key_coder>
   
       def test_pardo_state_with_custom_key_coder(self):
         """Tests that state requests work correctly when the key coder is an
         SDK-specific coder, i.e. non standard coder. This is additionally 
enforced
         by Java's ProcessBundleDescriptorsTest and by Flink's
         ExecutableStageDoFnOperator which detects invalid encoding by checking 
for
         the correct key group of the encoded key."""
         index_state_spec = userstate.CombiningValueStateSpec('index', sum)
       
         # Test params
         # Ensure decent amount of elements to serve all partitions
         n = 200
         duplicates = 1
       
         split = n // (duplicates + 1)
         inputs = [(i % split, str(i % split)) for i in range(0, n)]
       
         # Use a DoFn which has to use FastPrimitivesCoder because the type 
cannot
         # be inferred
         class Input(beam.DoFn):
           def process(self, impulse):
             for i in inputs:
               yield i
       
         class AddIndex(beam.DoFn):
           def process(self, kv, index=beam.DoFn.StateParam(index_state_spec)):
             k, v = kv
             index.add(1)
             yield k, v, index.read()
       
         expected = [(i % split, str(i % split), i // split + 1)
                     for i in range(0, n)]
       
         with self.create_pipeline() as p:
           assert_that(
               p
               | beam.Impulse()
               | beam.ParDo(Input())
               | beam.ParDo(AddIndex()),
   >           equal_to(expected))
   
   apache_beam\runners\portability\portable_runner_test.py:207: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _
   apache_beam\pipeline.py:598: in __exit__
       self.result.wait_until_finish()
   apache_beam\runners\portability\portable_runner.py:607: in wait_until_finish
       raise self._runtime_exception
   apache_beam\runners\portability\portable_runner.py:613: in _observe_state
       for state_response in self._state_stream:
   
target\.tox\py37-win\lib\site-packages\grpc\_channel.py:[426](https://github.com/apache/beam/actions/runs/3411967870/jobs/5676856627#step:7:427):
 in __next__
       return self._next()
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _
   
   self = <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.DEADLINE_EXCEEDED
        details = "Deadline Exc...%5B::1%5D:63832 
{created_time:"2022-11-07T16:19:05.79894806+00:00", grpc_status:4, 
grpc_message:"Deadline Exceeded"}"
   >
   
       def _next(self):
           with self._state.condition:
               if self._state.code is None:
                   event_handler = _event_handler(self._state,
                                                  self._response_deserializer)
                   self._state.due.add(cygrpc.OperationType.receive_message)
                   operating = self._call.operate(
                       (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                       event_handler)
                   if not operating:
                       
self._state.due.remove(cygrpc.OperationType.receive_message)
               elif self._state.code is grpc.StatusCode.OK:
                   raise StopIteration()
               else:
                   raise self
       
               def _response_ready():
                   return (self._state.response is not None or
                           (cygrpc.OperationType.receive_message
                            not in self._state.due and
                            self._state.code is not None))
       
               _common.wait(self._state.condition.wait, _response_ready)
               if self._state.response is not None:
                   response = self._state.response
                   self._state.response = None
                   return response
               elif cygrpc.OperationType.receive_message not in self._state.due:
                   if self._state.code is grpc.StatusCode.OK:
                       raise StopIteration()
                   elif self._state.code is not None:
   >                   raise self
   E                   grpc._channel._MultiThreadedRendezvous: 
<_MultiThreadedRendezvous of RPC that terminated with:
   E                    status = StatusCode.DEADLINE_EXCEEDED
   E                    details = "Deadline Exceeded"
   E                    debug_error_string = "UNKNOWN:Error received from peer 
ipv6:%5B::1%5D:63832 
{created_time:"2022-11-07T16:19:05.7989[480](https://github.com/apache/beam/actions/runs/3411967870/jobs/5676856627#step:7:481)6+00:00",
 grpc_status:4, grpc_message:"Deadline Exceeded"}"
   E                   >
   
   target\.tox\py37-win\lib\site-packages\grpc\_channel.py:826: 
_MultiThreadedRendezvous
   ---------------------------- Captured stderr call 
-----------------------------
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to