[ 
https://issues.apache.org/jira/browse/BEAM-3730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512821#comment-16512821
 ] 

Valentyn Tymofieiev commented on BEAM-3730:
-------------------------------------------

Currently, AnyTypeConstraint and TypeVariable  classes in 
sdks/python/apache_beam/typehints/typehints.py define __eq__ based on the 
object fields, and don't define __hash__, which means a default implementation 
such as hash(id(self)) is used. 

Making __hash__ consistent with __eq__ which is done for all other classes in 
typehints causes assertion failures in Beam codebase, that surface in 
integration tests:

 

======================================================================

FAIL: test_to_dict_runtime_check_satisfied 
(apache_beam.transforms.ptransform_test.PTransformTypeCheckTestCase) 
---------------------------------------------------------------------- 
Traceback (most recent call last): 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/transforms/ptransform_test.py",
 line 2004, in test_to_dict_runtime_check_satisfied 
 self.p.run() 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/testing/test_pipeline.py",
 line 102, in run 
 result = super(TestPipeline, self).run(test_runner_api) 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/pipeline.py",
 line 402, in run
 return self.runner.run_pipeline(self)
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
 line 134, in run_pipeline
 return runner.run_pipeline(pipeline)
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 218, in run_pipeline
 return self.run_via_runner_api(pipeline.to_runner_api())
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 221, in run_via_runner_api
 return self.run_stages(*self.create_stages(pipeline_proto))
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 841, in run_stages
 pcoll_buffers, safe_coders).process_bundle.metrics
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 949, in run_stage
 self._progress_frequency).process_bundle(data_input, data_output)
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 1153, in process_bundle
 result_future = self._controller.control_handler.push(process_bundle)
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 1033, in push
 response = self.worker.do_instruction(request)
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 208, in do_instruction
 request.instruction_id)
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 230, in process_bundle
 processor.process_bundle(instruction_id)
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
 line 295, in process_bundle
 input_op.process_encoded(data.data)
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
 line 116, in process_encoded
 self.output(decoded_value)
 File "apache_beam/runners/worker/operations.py", line 174, in 
apache_beam.runners.worker.operations.Operation.output
 def output(self, windowed_value, output_index=0):
 File "apache_beam/runners/worker/operations.py", line 175, in 
apache_beam.runners.worker.operations.Operation.output
 cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 85, in 
apache_beam.runners.worker.operations.ConsumerSet.receive
 cython.cast(Operation, consumer).process(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 445, in 
apache_beam.runners.worker.operations.CombineOperation.process
 self.output(
 File "apache_beam/runners/worker/operations.py", line 175, in 
apache_beam.runners.worker.operations.Operation.output
 cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 85, in 
apache_beam.runners.worker.operations.ConsumerSet.receive
 cython.cast(Operation, consumer).process(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 445, in 
apache_beam.runners.worker.operations.CombineOperation.process
 self.output(
 File "apache_beam/runners/worker/operations.py", line 175, in 
apache_beam.runners.worker.operations.Operation.output
 cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 83, in 
apache_beam.runners.worker.operations.ConsumerSet.receive
 self.update_counters_start(windowed_value)
 File "apache_beam/runners/worker/operations.py", line 89, in 
apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
 self.opcounter.update_from(windowed_value)
 File "apache_beam/runners/worker/opcounters.py", line 191, in 
apache_beam.runners.worker.opcounters.OperationCounters.update_from
 self.do_sample(windowed_value)
 File "apache_beam/runners/worker/opcounters.py", line 209, in 
apache_beam.runners.worker.opcounters.OperationCounters.do_sample
 self.coder_impl.get_estimated_size_and_observables(windowed_value))
 File "apache_beam/coders/coder_impl.py", line 835, in 
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
 def get_estimated_size_and_observables(self, value, nested=False):
 File "apache_beam/coders/coder_impl.py", line 844, in 
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
 self._value_coder.get_estimated_size_and_observables(
 File "apache_beam/coders/coder_impl.py", line 547, in 
apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
 c.get_estimated_size_and_observables(
 File "apache_beam/coders/coder_impl.py", line 122, in 
apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
 return self.estimate_size(value, nested), []
 File "apache_beam/coders/coder_impl.py", line 98, in 
apache_beam.coders.coder_impl.CoderImpl.estimate_size
 return self._get_nested_size(len(self.encode(value)), nested)
 File "apache_beam/coders/coder_impl.py", line 374, in 
apache_beam.coders.coder_impl.BytesCoderImpl.encode
 assert isinstance(value, bytes), (value, type(value))
AssertionError: (None, <type 'NoneType'>)

 

====================================================================== 
ERROR: test_to_dict_pipeline_check_satisfied 
(apache_beam.transforms.ptransform_test.PTransformTypeCheckTestCase) 
---------------------------------------------------------------------- 
Traceback (most recent call last): 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/transforms/ptransform_test.py",
 line 1992, in test_to_dict_pipeline_check_satisfied 
 self.p.run() 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/testing/test_pipeline.py",
 line 102, in run 
 result = super(TestPipeline, self).run(test_runner_api) 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/pipeline.py",
 line 389, in run 
 self.to_runner_api(), self.runner, self._options).run(False) 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/pipeline.py",
 line 402, in run 
 return self.runner.run_pipeline(self) 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
 line 134, in run_pipeline 
 return runner.run_pipeline(pipeline) 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 218, in run_pipeline 
 return self.run_via_runner_api(pipeline.to_runner_api()) 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 221, in run_via_runner_api 
 return self.run_stages(*self.create_stages(pipeline_proto)) 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 841, in run_stages 
 pcoll_buffers, safe_coders).process_bundle.metrics 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 949, in run_stage 
 self._progress_frequency).process_bundle(data_input, data_output) 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 1153, in process_bundle 
 result_future = self._controller.control_handler.push(process_bundle) 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
 line 1033, in push 
 response = self.worker.do_instruction(request) 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 208, in do_instruction 
 request.instruction_id) 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 230, in process_bundle 
 processor.process_bundle(instruction_id) 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
 line 295, in process_bundle 
 input_op.process_encoded(data.data) 
 File 
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
 line 116, in process_encoded 
 self.output(decoded_value) 
 File "apache_beam/runners/worker/operations.py", line 174, in 
apache_beam.runners.worker.operations.Operation.output 
 def output(self, windowed_value, output_index=0): 
 File "apache_beam/runners/worker/operations.py", line 175, in 
apache_beam.runners.worker.operations.Operation.output 
 cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) 
 File "apache_beam/runners/worker/operations.py", line 85, in 
apache_beam.runners.worker.operations.ConsumerSet.receive 
 cython.cast(Operation, consumer).process(windowed_value) 
 File "apache_beam/runners/worker/operations.py", line 445, in 
apache_beam.runners.worker.operations.CombineOperation.process 
 self.output( 
 File "apache_beam/runners/worker/operations.py", line 175, in 
apache_beam.runners.worker.operations.Operation.output 
 cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) 
 File "apache_beam/runners/worker/operations.py", line 85, in 
apache_beam.runners.worker.operations.ConsumerSet.receive 
 cython.cast(Operation, consumer).process(windowed_value) 
 File "apache_beam/runners/worker/operations.py", line 445, in 
apache_beam.runners.worker.operations.CombineOperation.process 
 self.output( 
 File "apache_beam/runners/worker/operations.py", line 175, in 
apache_beam.runners.worker.operations.Operation.output 
 cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) 
 File "apache_beam/runners/worker/operations.py", line 83, in 
apache_beam.runners.worker.operations.ConsumerSet.receive 
 self.update_counters_start(windowed_value) 
 File "apache_beam/runners/worker/operations.py", line 89, in 
apache_beam.runners.worker.operations.ConsumerSet.update_counters_start 
 self.opcounter.update_from(windowed_value) 
 File "apache_beam/runners/worker/opcounters.py", line 191, in 
apache_beam.runners.worker.opcounters.OperationCounters.update_from 
 self.do_sample(windowed_value) 
 File "apache_beam/runners/worker/opcounters.py", line 209, in 
apache_beam.runners.worker.opcounters.OperationCounters.do_sample 
 self.coder_impl.get_estimated_size_and_observables(windowed_value)) 
 File "apache_beam/coders/coder_impl.py", line 835, in 
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
 
 def get_estimated_size_and_observables(self, value, nested=False): 
 File "apache_beam/coders/coder_impl.py", line 844, in 
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
 
 self._value_coder.get_estimated_size_and_observables( 
 File "apache_beam/coders/coder_impl.py", line 547, in 
apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
 
 c.get_estimated_size_and_observables( 
 File "apache_beam/coders/coder_impl.py", line 122, in 
apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables 
 return self.estimate_size(value, nested), [] 
 File "apache_beam/coders/coder_impl.py", line 472, in 
apache_beam.coders.coder_impl.VarIntCoderImpl.estimate_size 
 return get_varint_size(value) 
 File "apache_beam/coders/stream.pyx", line 222, in 
apache_beam.coders.stream.get_varint_size 
 cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value): 
TypeError: an integer is required

======================================================================

 

 

 

> typehints.TypeVariable issues with __hash__
> -------------------------------------------
>
>                 Key: BEAM-3730
>                 URL: https://issues.apache.org/jira/browse/BEAM-3730
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Luke Zhu
>            Priority: Major
>
> This class currently does not define a __hash__ function. This defies the 
> assumption made by sets and dicts which contain TypeConstraint objects.
> In Python 3, classes which define ___eq___ also need to explicitly define 
> ___hash___ to be hashable. However, this causes 
> _combiners_test.CombineTest.test_to_list_and_to_dict_ to throw an error in 
> slow_stream.py (adding NoneType and long).
> There are multiple TypeVariable instances with name 'K' or 'V' throughout the 
> codebase. The equality of these instances may possibly cause issues.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to