[ 
https://issues.apache.org/jira/browse/BEAM-10813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Webb updated BEAM-10813:
-----------------------------
    Resolution: Won't Fix
        Status: Resolved  (was: Triage Needed)

old issue - resolving

 

> TypeError: Expected bytes, got list in 
> apache_beam.coders.coder_impl.BytesCoderImpl.encode_to_stream
> ----------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-10813
>                 URL: https://issues.apache.org/jira/browse/BEAM-10813
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink, sdk-py-harness
>         Environment: Flink 1.10.1
> Beam worker pool: apache/beam_python3.6_sdk:2.22.0
> SDK: apache beam 2.22.0
> Python 3.6
>            Reporter: Jiaxin Shan
>            Priority: P3
>
> I am trying to run 
> [https://github.com/tensorflow/tfx/tree/master/tfx/examples/chicago_taxi_pipeline]
>  using Flink Runner on Kubernetes. 
> I resolved a few issues and finally make beam-worker-pool to pick up the 
> tasks. Then, I get follow error messages
>  
> {code:java}
> // code placeholder
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner
>     self.run()
>   File "/usr/local/lib/python3.6/threading.py", line 864, in run
>     self._target(*self._args, **self._kwargs)
>   File 
> "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py",
>  line 545, in <lambda>
>     target=lambda: self._read_inputs(elements_iterator),
>   File 
> "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py",
>  line 528, in _read_inputs
>     for elements in elements_iterator:
>   File "/usr/local/lib/python3.6/site-packages/grpc/_channel.py", line 416, 
> in __next__
>     return self._next()
>   File "/usr/local/lib/python3.6/site-packages/grpc/_channel.py", line 689, 
> in _next
>     raise self
> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that 
> terminated with:
>         status = StatusCode.UNAVAILABLE
>         details = "DNS resolution failed for service: "
>         debug_error_string = 
> "{"created":"@1598390105.406081334","description":"Resolver transient 
> failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":213,"referenced_errors":[{"created":"@1598390105.406079944","description":"DNS
>  resolution failed for service: 
> ","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":377,"grpc_status":14,"referenced_errors":[{"created":"@1598390105.406076335","description":"unparseable
>  
> host:port","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":410,"target_address":""}]}]}"
> >Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 961, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 553, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1122, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 194, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 164, in 
> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
>   File "apache_beam/runners/worker/opcounters.py", line 210, in 
> apache_beam.runners.worker.opcounters.OperationCounters.update_from
>   File "apache_beam/runners/worker/opcounters.py", line 230, in 
> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
>   File "apache_beam/coders/coder_impl.py", line 1187, in 
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1198, in 
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1000, in 
> apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 192, in 
> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 165, in 
> apache_beam.coders.coder_impl.CoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 488, in 
> apache_beam.coders.coder_impl.BytesCoderImpl.encode_to_stream
> TypeError: Expected bytes, got listDuring handling of the above exception, 
> another exception occurred:Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 245, in _execute
>     response = task()
>   File 
> "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 302, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File 
> "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 471, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File 
> "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 506, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 972, in process_bundle
>     element.data)
>   File 
> "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 218, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 330, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 332, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 195, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 755, in 
> apache_beam.runners.worker.operations.SdfProcessSizedElements.process
>   File "apache_beam/runners/worker/operations.py", line 764, in 
> apache_beam.runners.worker.operations.SdfProcessSizedElements.process
>   File "apache_beam/runners/common.py", line 971, in 
> apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
>   File "apache_beam/runners/common.py", line 711, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 807, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 1122, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 195, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 670, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 671, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 963, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1045, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", 
> line 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam/runners/common.py", line 961, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 553, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1122, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 194, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 164, in 
> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
>   File "apache_beam/runners/worker/opcounters.py", line 210, in 
> apache_beam.runners.worker.opcounters.OperationCounters.update_from
>   File "apache_beam/runners/worker/opcounters.py", line 230, in 
> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
>   File "apache_beam/coders/coder_impl.py", line 1187, in 
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1198, in 
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1000, in 
> apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 192, in 
> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 165, in 
> apache_beam.coders.coder_impl.CoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 488, in 
> apache_beam.coders.coder_impl.BytesCoderImpl.encode_to_stream
> TypeError: Expected bytes, got list [while running 
> 'InputToSerializedExample/InputSourceToExample/ParseCSVLine']2020/08/25 
> 21:15:10 Python exited: <nil>
> {code}
> This code can run using DirectRunner without any problems..Not sure why it 
> doesn't work on Flink.  TFX Job Information
>  
> {code:java}
>  - --pipeline_name      - parameterized_tfx_oss      - --pipeline_root      - 
> '{{inputs.parameters.pipeline-root}}'      - --kubeflow_metadata_config      
> - |-        {          "grpc_config": {            "grpc_service_host": {     
>          "environment_variable": "METADATA_GRPC_SERVICE_SERVICE_HOST"         
>    },            "grpc_service_port": {              "environment_variable": 
> "METADATA_GRPC_SERVICE_SERVICE_PORT"            }          }        }      - 
> --beam_pipeline_args      - '["--runner=FlinkRunner", 
> "--flink_master=beam-flink-cluster-jobmanager:8081",        
> "--flink_submit_uber_jar", "--environment_type=EXTERNAL", 
> "--environment_config=localhost:50000"]'      - --additional_pipeline_args    
>   - '{}'      - --component_launcher_class_path      - 
> tfx.orchestration.launcher.in_process_component_launcher.InProcessComponentLauncher
>       - --serialized_component      - '{"__class__": "NodeWrapper", 
> "__module__": "tfx.orchestration.kubeflow.node_wrapper",        
> "__tfx_object_type__": "jsonable", "_exec_properties": {"custom_config": 
> null,        "input_config": "{\n  \"splits\": [\n    {\n      \"name\": 
> \"single_split\",\n      \"pattern\":        \"*\"\n    }\n  ]\n}", 
> "output_config": "{\n  \"split_config\": {\n    \"splits\":        [\n      
> {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      
> {\n        \"hash_buckets\":        1,\n        \"name\": \"eval\"\n      }\n 
>    ]\n  }\n}"}, "_id": "CsvExampleGen",        "_inputs": {"__class__": 
> "_PropertyDictWrapper", "__module__": "tfx.types.node_common",        
> "__tfx_object_type__": "jsonable", "_compat_aliases": {"input_base": 
> "input"},        "_data": {"input": {"__class__": "Channel", "__module__": 
> "tfx.types.channel",        "__tfx_object_type__": "jsonable", "artifacts": 
> [{"__artifact_class_module__":        "tfx.types.standard_artifacts", 
> "__artifact_class_name__": "ExternalArtifact",        "artifact": {"uri": 
> "{{inputs.parameters.data-root}}"}, "artifact_type": {"name":        
> "ExternalArtifact"}}], "output_key": null, "producer_component_id": null,     
>    "type": {"name": "ExternalArtifact"}}}}, "_outputs": {"__class__": 
> "_PropertyDictWrapper",        "__module__": "tfx.types.node_common", 
> "__tfx_object_type__": "jsonable",        "_compat_aliases": {}, "_data": 
> {"examples": {"__class__": "Channel", "__module__":        
> "tfx.types.channel", "__tfx_object_type__": "jsonable", "artifacts": 
> [{"__artifact_class_module__":        "tfx.types.standard_artifacts", 
> "__artifact_class_name__": "Examples", "artifact":        
> {"custom_properties": {"name": {"string_value": "examples"}, "pipeline_name": 
>        {"string_value": "parameterized_tfx_oss"}, "producer_component": 
> {"string_value":        "CsvExampleGen"}}, "properties": {"split_names": 
> {"string_value": "[\"train\",        \"eval\"]"}}}, "artifact_type": {"name": 
> "Examples", "properties": {"span":        "INT", "split_names": "STRING"}}}], 
> "output_key": "examples", "producer_component_id":        "CsvExampleGen", 
> "type": {"name": "Examples", "properties": {"span": "INT",        
> "split_names": "STRING"}}}}}, "_type": 
> "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen",        
> "driver_class": {"__class__": "Driver", "__module__": 
> "tfx.components.example_gen.driver",        "__tfx_object_type__": "class"}, 
> "executor_spec": {"__class__": "ExecutorClassSpec",        "__module__": 
> "tfx.components.base.executor_spec", "__tfx_object_type__":        
> "jsonable", "executor_class": {"__class__": "Executor", "__module__": 
> "tfx.components.example_gen.csv_example_gen.executor",        
> "__tfx_object_type__": "class"}}}'      - --component_config      - 'null'    
>   - --enable_cache      command: [python, 
> /tfx-src/tfx/orchestration/kubeflow/container_entrypoint.py]
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to