[
https://issues.apache.org/jira/browse/BEAM-3369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297552#comment-16297552
]
ASF GitHub Bot commented on BEAM-3369:
--------------------------------------
robertwb closed pull request #4293: [BEAM-3369] Allow any coder to specify its
deterministic variant.
URL: https://github.com/apache/beam/pull/4293
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/coders/coders.py
b/sdks/python/apache_beam/coders/coders.py
index 67d5adba60b..64902b592e3 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -97,6 +97,16 @@ def is_deterministic(self):
"""
return False
+ def as_deterministic_coder(self, step_label, error_message=None):
+ """Returns a deterministic version of self, if possible.
+
+ Otherwise raises a value error.
+ """
+ if self.is_deterministic():
+ return self
+ else:
+ raise ValueError(error_message or "'%s' cannot be made deterministic.")
+
def estimate_size(self, value):
"""Estimates the encoded size of the given value, in bytes.
@@ -497,6 +507,9 @@ def _create_impl(self):
return coder_impl.CallbackCoderImpl(
lambda x: dumps(x, HIGHEST_PROTOCOL), pickle.loads)
+ def as_deterministic_coder(self, step_label, error_message=None):
+ return DeterministicFastPrimitivesCoder(self, step_label)
+
class DillCoder(_PickleCoderBase):
"""Coder using dill's pickle functionality."""
@@ -544,6 +557,12 @@ def _create_impl(self):
def is_deterministic(self):
return self._fallback_coder.is_deterministic()
+ def as_deterministic_coder(self, step_label, error_message=None):
+ if self.is_deterministic():
+ return self
+ else:
+ return DeterministicFastPrimitivesCoder(self, step_label)
+
def as_cloud_object(self, is_pair_like=True):
value = super(FastCoder, self).as_cloud_object()
# We currently use this coder in places where we cannot infer the coder to
@@ -662,6 +681,13 @@ def _create_impl(self):
def is_deterministic(self):
return all(c.is_deterministic() for c in self._coders)
+ def as_deterministic_coder(self, step_label, error_message=None):
+ if self.is_deterministic():
+ return self
+ else:
+ return TupleCoder([c.as_deterministic_coder(step_label, error_message)
+ for c in self._coders])
+
@staticmethod
def from_type_hint(typehint, registry):
return TupleCoder([registry.get_coder(t) for t in typehint.tuple_types])
@@ -731,6 +757,13 @@ def _create_impl(self):
def is_deterministic(self):
return self._elem_coder.is_deterministic()
+ def as_deterministic_coder(self, step_label, error_message=None):
+ if self.is_deterministic():
+ return self
+ else:
+ return TupleSequenceCoder(
+ self._elem_coder.as_deterministic_coder(step_label, error_message))
+
@staticmethod
def from_type_hint(typehint, registry):
return TupleSequenceCoder(registry.get_coder(typehint.inner_type))
@@ -761,6 +794,13 @@ def _create_impl(self):
def is_deterministic(self):
return self._elem_coder.is_deterministic()
+ def as_deterministic_coder(self, step_label, error_message=None):
+ if self.is_deterministic():
+ return self
+ else:
+ return IterableCoder(
+ self._elem_coder.as_deterministic_coder(step_label, error_message))
+
def as_cloud_object(self):
return {
'@type': 'kind:stream',
diff --git a/sdks/python/apache_beam/coders/typecoders.py
b/sdks/python/apache_beam/coders/typecoders.py
index d871c31b2ec..82d296ddeb7 100644
--- a/sdks/python/apache_beam/coders/typecoders.py
+++ b/sdks/python/apache_beam/coders/typecoders.py
@@ -64,7 +64,6 @@ def MakeXyzs(v):
See apache_beam.typehints.decorators module for more details.
"""
-import logging
import warnings
from apache_beam.coders import coders
@@ -148,19 +147,7 @@ def verify_deterministic(self, key_coder, op_name,
silent=True):
'and for custom key classes, by writing a '
'deterministic custom Coder. Please see the '
'documentation for more details.' % (key_coder, op_name))
- # TODO(vikasrk): PickleCoder will eventually be removed once its direct
- # usage is stopped.
- if isinstance(key_coder, (coders.PickleCoder,
- coders.FastPrimitivesCoder)):
- if not silent:
- logging.warning(error_msg)
- return coders.DeterministicFastPrimitivesCoder(key_coder, op_name)
- elif isinstance(key_coder, coders.TupleCoder):
- return coders.TupleCoder([
- self.verify_deterministic(coder, op_name, silent)
- for coder in key_coder.coders()])
- else:
- raise ValueError(error_msg)
+ return key_coder.as_deterministic_coder(op_name, error_msg)
else:
return key_coder
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Python HEAD fails tests due to a ValueError
> --------------------------------------------
>
> Key: BEAM-3369
> URL: https://issues.apache.org/jira/browse/BEAM-3369
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: Not applicable
> Reporter: Chamikara Jayalath
> Assignee: Robert Bradshaw
> Priority: Critical
>
> tfidf_test.py is failing. Full error message is given below.
> ValueError: The key coder "TupleCoder[BytesCoder, FastPrimitivesCoder]" for
> GroupByKey operation "GroupByKey" is not deterministic. This may result in
> incorrect pipeline output. This can be fixed by adding a type hint to the
> operation preceding the GroupByKey step, and for custom key classes, by
> writing a deterministic custom Coder. Please see the documentation for more
> details.
> Seems to be due to following commit (passes without that).
> https://github.com/apache/beam/commit/1acd1ae901eefbcc8249d90e12ca82db0f91e41e#commitcomment-26356503
> Robert, can you take a look ?
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)