[
https://issues.apache.org/jira/browse/BEAM-3730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512821#comment-16512821
]
Valentyn Tymofieiev commented on BEAM-3730:
-------------------------------------------
Currently, AnyTypeConstraint and TypeVariable classes in
sdks/python/apache_beam/typehints/typehints.py define __eq__ based on the
object fields, and don't define __hash__, which means a default implementation
such as hash(id(self)) is used.
Making __hash__ consistent with __eq__ which is done for all other classes in
typehints causes assertion failures in Beam codebase, that surface in
integration tests:
======================================================================
FAIL: test_to_dict_runtime_check_satisfied
(apache_beam.transforms.ptransform_test.PTransformTypeCheckTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/transforms/ptransform_test.py",
line 2004, in test_to_dict_runtime_check_satisfied
self.p.run()
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/testing/test_pipeline.py",
line 102, in run
result = super(TestPipeline, self).run(test_runner_api)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/pipeline.py",
line 402, in run
return self.runner.run_pipeline(self)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
line 134, in run_pipeline
return runner.run_pipeline(pipeline)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
line 218, in run_pipeline
return self.run_via_runner_api(pipeline.to_runner_api())
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
line 221, in run_via_runner_api
return self.run_stages(*self.create_stages(pipeline_proto))
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
line 841, in run_stages
pcoll_buffers, safe_coders).process_bundle.metrics
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
line 949, in run_stage
self._progress_frequency).process_bundle(data_input, data_output)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
line 1153, in process_bundle
result_future = self._controller.control_handler.push(process_bundle)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
line 1033, in push
response = self.worker.do_instruction(request)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 208, in do_instruction
request.instruction_id)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 230, in process_bundle
processor.process_bundle(instruction_id)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
line 295, in process_bundle
input_op.process_encoded(data.data)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
line 116, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 174, in
apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
File "apache_beam/runners/worker/operations.py", line 175, in
apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 85, in
apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 445, in
apache_beam.runners.worker.operations.CombineOperation.process
self.output(
File "apache_beam/runners/worker/operations.py", line 175, in
apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 85, in
apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 445, in
apache_beam.runners.worker.operations.CombineOperation.process
self.output(
File "apache_beam/runners/worker/operations.py", line 175, in
apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 83, in
apache_beam.runners.worker.operations.ConsumerSet.receive
self.update_counters_start(windowed_value)
File "apache_beam/runners/worker/operations.py", line 89, in
apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
self.opcounter.update_from(windowed_value)
File "apache_beam/runners/worker/opcounters.py", line 191, in
apache_beam.runners.worker.opcounters.OperationCounters.update_from
self.do_sample(windowed_value)
File "apache_beam/runners/worker/opcounters.py", line 209, in
apache_beam.runners.worker.opcounters.OperationCounters.do_sample
self.coder_impl.get_estimated_size_and_observables(windowed_value))
File "apache_beam/coders/coder_impl.py", line 835, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
def get_estimated_size_and_observables(self, value, nested=False):
File "apache_beam/coders/coder_impl.py", line 844, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
self._value_coder.get_estimated_size_and_observables(
File "apache_beam/coders/coder_impl.py", line 547, in
apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
c.get_estimated_size_and_observables(
File "apache_beam/coders/coder_impl.py", line 122, in
apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
return self.estimate_size(value, nested), []
File "apache_beam/coders/coder_impl.py", line 98, in
apache_beam.coders.coder_impl.CoderImpl.estimate_size
return self._get_nested_size(len(self.encode(value)), nested)
File "apache_beam/coders/coder_impl.py", line 374, in
apache_beam.coders.coder_impl.BytesCoderImpl.encode
assert isinstance(value, bytes), (value, type(value))
AssertionError: (None, <type 'NoneType'>)
======================================================================
ERROR: test_to_dict_pipeline_check_satisfied
(apache_beam.transforms.ptransform_test.PTransformTypeCheckTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/transforms/ptransform_test.py",
line 1992, in test_to_dict_pipeline_check_satisfied
self.p.run()
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/testing/test_pipeline.py",
line 102, in run
result = super(TestPipeline, self).run(test_runner_api)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/pipeline.py",
line 389, in run
self.to_runner_api(), self.runner, self._options).run(False)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/pipeline.py",
line 402, in run
return self.runner.run_pipeline(self)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
line 134, in run_pipeline
return runner.run_pipeline(pipeline)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
line 218, in run_pipeline
return self.run_via_runner_api(pipeline.to_runner_api())
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
line 221, in run_via_runner_api
return self.run_stages(*self.create_stages(pipeline_proto))
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
line 841, in run_stages
pcoll_buffers, safe_coders).process_bundle.metrics
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
line 949, in run_stage
self._progress_frequency).process_bundle(data_input, data_output)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
line 1153, in process_bundle
result_future = self._controller.control_handler.push(process_bundle)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
line 1033, in push
response = self.worker.do_instruction(request)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 208, in do_instruction
request.instruction_id)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 230, in process_bundle
processor.process_bundle(instruction_id)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
line 295, in process_bundle
input_op.process_encoded(data.data)
File
"/usr/local/google/home/valentyn/projects/beam/microb/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
line 116, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 174, in
apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
File "apache_beam/runners/worker/operations.py", line 175, in
apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 85, in
apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 445, in
apache_beam.runners.worker.operations.CombineOperation.process
self.output(
File "apache_beam/runners/worker/operations.py", line 175, in
apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 85, in
apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 445, in
apache_beam.runners.worker.operations.CombineOperation.process
self.output(
File "apache_beam/runners/worker/operations.py", line 175, in
apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 83, in
apache_beam.runners.worker.operations.ConsumerSet.receive
self.update_counters_start(windowed_value)
File "apache_beam/runners/worker/operations.py", line 89, in
apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
self.opcounter.update_from(windowed_value)
File "apache_beam/runners/worker/opcounters.py", line 191, in
apache_beam.runners.worker.opcounters.OperationCounters.update_from
self.do_sample(windowed_value)
File "apache_beam/runners/worker/opcounters.py", line 209, in
apache_beam.runners.worker.opcounters.OperationCounters.do_sample
self.coder_impl.get_estimated_size_and_observables(windowed_value))
File "apache_beam/coders/coder_impl.py", line 835, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
def get_estimated_size_and_observables(self, value, nested=False):
File "apache_beam/coders/coder_impl.py", line 844, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
self._value_coder.get_estimated_size_and_observables(
File "apache_beam/coders/coder_impl.py", line 547, in
apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
c.get_estimated_size_and_observables(
File "apache_beam/coders/coder_impl.py", line 122, in
apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
return self.estimate_size(value, nested), []
File "apache_beam/coders/coder_impl.py", line 472, in
apache_beam.coders.coder_impl.VarIntCoderImpl.estimate_size
return get_varint_size(value)
File "apache_beam/coders/stream.pyx", line 222, in
apache_beam.coders.stream.get_varint_size
cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value):
TypeError: an integer is required
======================================================================
> typehints.TypeVariable issues with __hash__
> -------------------------------------------
>
> Key: BEAM-3730
> URL: https://issues.apache.org/jira/browse/BEAM-3730
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Luke Zhu
> Priority: Major
>
> This class currently does not define a __hash__ function. This defies the
> assumption made by sets and dicts which contain TypeConstraint objects.
> In Python 3, classes which define ___eq___ also need to explicitly define
> ___hash___ to be hashable. However, this causes
> _combiners_test.CombineTest.test_to_list_and_to_dict_ to throw an error in
> slow_stream.py (adding NoneType and long).
> There are multiple TypeVariable instances with name 'K' or 'V' throughout the
> codebase. The equality of these instances may possibly cause issues.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)