[ 
https://issues.apache.org/jira/browse/BEAM-10824?focusedWorklogId=481251&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-481251
 ]

ASF GitHub Bot logged work on BEAM-10824:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Sep/20 05:36
            Start Date: 10/Sep/20 05:36
    Worklog Time Spent: 10m 
      Work Description: monicadsong commented on a change in pull request 
#12756:
URL: https://github.com/apache/beam/pull/12756#discussion_r486076366



##########
File path: sdks/python/apache_beam/transforms/stats_test.py
##########
@@ -41,355 +41,88 @@
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.transforms.stats import ApproximateQuantilesCombineFn
+from apache_beam.transforms.stats import ApproximateUniqueCombineFn
 
+try:
+  import mmh3
+  mmh3_options = [(mmh3, ), (None, )]
+except ImportError:
+  mmh3_options = [(None, )]
 
-class ApproximateUniqueTest(unittest.TestCase):
-  """Unit tests for ApproximateUnique.Globally and ApproximateUnique.PerKey.
-  Hash() with Python3 is nondeterministic, so Approximation algorithm generates
-  different result each time and sometimes error rate is out of range, so add
-  retries for all tests who actually running approximation algorithm."""
-  def test_approximate_unique_global_by_invalid_size(self):
-    # test if the transformation throws an error as expected with an invalid
-    # small input size (< 16).
-    sample_size = 10
-    test_input = [random.randint(0, 1000) for _ in range(100)]
-
-    with self.assertRaises(ValueError) as e:
-      with TestPipeline() as pipeline:
-        _ = (
-            pipeline
-            | 'create' >> beam.Create(test_input)
-            |
-            'get_estimate' >> 
beam.ApproximateUnique.Globally(size=sample_size))
-
-    expected_msg = beam.ApproximateUnique._INPUT_SIZE_ERR_MSG % (sample_size)
-
-    assert e.exception.args[0] == expected_msg
-
-  def test_approximate_unique_global_by_invalid_type_size(self):
-    # test if the transformation throws an error as expected with an invalid
-    # type of input size (not int).
-    sample_size = 100.0
-    test_input = [random.randint(0, 1000) for _ in range(100)]
-
-    with self.assertRaises(ValueError) as e:
-      with TestPipeline() as pipeline:
-        _ = (
-            pipeline
-            | 'create' >> beam.Create(test_input)
-            |
-            'get_estimate' >> 
beam.ApproximateUnique.Globally(size=sample_size))
-
-    expected_msg = beam.ApproximateUnique._INPUT_SIZE_ERR_MSG % (sample_size)
-
-    assert e.exception.args[0] == expected_msg
-
-  def test_approximate_unique_global_by_invalid_small_error(self):
-    # test if the transformation throws an error as expected with an invalid
-    # small input error (< 0.01).
-    est_err = 0.0
-    test_input = [random.randint(0, 1000) for _ in range(100)]
-
-    with self.assertRaises(ValueError) as e:
-      with TestPipeline() as pipeline:
-        _ = (
-            pipeline
-            | 'create' >> beam.Create(test_input)
-            | 'get_estimate' >> beam.ApproximateUnique.Globally(error=est_err))
-
-    expected_msg = beam.ApproximateUnique._INPUT_ERROR_ERR_MSG % (est_err)
-
-    assert e.exception.args[0] == expected_msg
-
-  def test_approximate_unique_global_by_invalid_big_error(self):
-    # test if the transformation throws an error as expected with an invalid
-    # big input error (> 0.50).
-    est_err = 0.6
-    test_input = [random.randint(0, 1000) for _ in range(100)]
-
-    with self.assertRaises(ValueError) as e:
-      with TestPipeline() as pipeline:
-        _ = (
-            pipeline
-            | 'create' >> beam.Create(test_input)
-            | 'get_estimate' >> beam.ApproximateUnique.Globally(error=est_err))
-
-    expected_msg = beam.ApproximateUnique._INPUT_ERROR_ERR_MSG % (est_err)
-
-    assert e.exception.args[0] == expected_msg
-
-  def test_approximate_unique_global_by_invalid_no_input(self):
-    # test if the transformation throws an error as expected with no input.
-    test_input = [random.randint(0, 1000) for _ in range(100)]
-
-    with self.assertRaises(ValueError) as e:
-      with TestPipeline() as pipeline:
-        _ = (
-            pipeline
-            | 'create' >> beam.Create(test_input)
-            | 'get_estimate' >> beam.ApproximateUnique.Globally())
-
-    expected_msg = beam.ApproximateUnique._NO_VALUE_ERR_MSG
-    assert e.exception.args[0] == expected_msg
-
-  def test_approximate_unique_global_by_invalid_both_input(self):
-    # test if the transformation throws an error as expected with multi input.
-    test_input = [random.randint(0, 1000) for _ in range(100)]
-    est_err = 0.2
-    sample_size = 30
-
-    with self.assertRaises(ValueError) as e:
-      with TestPipeline() as pipeline:
-        _ = (
-            pipeline
-            | 'create' >> beam.Create(test_input)
-            | 'get_estimate' >> beam.ApproximateUnique.Globally(
-                size=sample_size, error=est_err))
-
-    expected_msg = beam.ApproximateUnique._MULTI_VALUE_ERR_MSG % (
-        sample_size, est_err)
-
-    assert e.exception.args[0] == expected_msg
-
-  def test_get_sample_size_from_est_error(self):
-    # test if get correct sample size from input error.
-    assert beam.ApproximateUnique._get_sample_size_from_est_error(0.5) == 16
-    assert beam.ApproximateUnique._get_sample_size_from_est_error(0.4) == 25
-    assert beam.ApproximateUnique._get_sample_size_from_est_error(0.2) == 100
-    assert beam.ApproximateUnique._get_sample_size_from_est_error(0.1) == 400
-    assert beam.ApproximateUnique._get_sample_size_from_est_error(0.05) == 1600
-    assert beam.ApproximateUnique._get_sample_size_from_est_error(0.01) == 
40000
-
-  @unittest.skip(
-      'Skip it because hash function is not good enough. '
-      'TODO: BEAM-7654')
-  def test_approximate_unique_global_by_sample_size(self):
-    # test if estimation error with a given sample size is not greater than
-    # expected max error.
-    sample_size = 16
-    max_err = 2 / math.sqrt(sample_size)
-    test_input = [
-        4,
-        34,
-        29,
-        46,
-        80,
-        66,
-        51,
-        81,
-        31,
-        9,
-        26,
-        36,
-        10,
-        41,
-        90,
-        35,
-        33,
-        19,
-        88,
-        86,
-        28,
-        93,
-        38,
-        76,
-        15,
-        87,
-        12,
-        39,
-        84,
-        13,
-        32,
-        49,
-        65,
-        100,
-        16,
-        27,
-        23,
-        30,
-        96,
-        54
-    ]
-
-    actual_count = len(set(test_input))
-
-    with TestPipeline() as pipeline:
-      result = (
-          pipeline
-          | 'create' >> beam.Create(test_input)
-          | 'get_estimate' >> beam.ApproximateUnique.Globally(size=sample_size)
-          | 'compare' >> beam.FlatMap(
-              lambda x: [abs(x - actual_count) * 1.0 / actual_count <= 
max_err])
-      )
-
-      assert_that(result, equal_to([True]), label='assert:global_by_size')
-
-  @retry(reraise=True, stop=stop_after_attempt(5))
-  def test_approximate_unique_global_by_sample_size_with_duplicates(self):
-    # test if estimation error with a given sample size is not greater than
-    # expected max error with duplicated input.
-    sample_size = 30
-    max_err = 2 / math.sqrt(sample_size)
-    test_input = [10] * 50 + [20] * 50
-    actual_count = len(set(test_input))
-
-    with TestPipeline() as pipeline:
-      result = (
-          pipeline
-          | 'create' >> beam.Create(test_input)
-          | 'get_estimate' >> beam.ApproximateUnique.Globally(size=sample_size)
-          | 'compare' >> beam.FlatMap(
-              lambda x: [abs(x - actual_count) * 1.0 / actual_count <= 
max_err])
-      )
-
-      assert_that(
-          result,
-          equal_to([True]),
-          label='assert:global_by_size_with_duplicates')
-
-  @retry(reraise=True, stop=stop_after_attempt(5))
-  def 
test_approximate_unique_global_by_sample_size_with_small_population(self):
-    # test if estimation is exactly same to actual value when sample size is
-    # not smaller than population size (sample size > 100% of population).
-    sample_size = 31
-    test_input = [
-        144,
-        160,
-        229,
-        923,
-        390,
-        756,
-        674,
-        769,
-        145,
-        888,
-        809,
-        159,
-        222,
-        101,
-        943,
-        901,
-        876,
-        194,
-        232,
-        631,
-        221,
-        829,
-        965,
-        729,
-        35,
-        33,
-        115,
-        894,
-        827,
-        364
-    ]
-    actual_count = len(set(test_input))
 
-    with TestPipeline() as pipeline:
-      result = (
-          pipeline
-          | 'create' >> beam.Create(test_input)
-          | 'get_estimate' >> 
beam.ApproximateUnique.Globally(size=sample_size))
-
-      assert_that(
-          result,
-          equal_to([actual_count]),
-          label='assert:global_by_sample_size_with_small_population')
-
-  @unittest.skip(
-      'Skip because hash function is not good enough. '
-      'TODO: BEAM-7654')
-  def test_approximate_unique_global_by_error(self):
-    # test if estimation error from input error is not greater than input 
error.
-    est_err = 0.3
-    test_input = [
-        291,
-        371,
-        271,
-        126,
-        762,
-        391,
-        222,
-        565,
-        428,
-        786,
-        801,
-        867,
-        337,
-        690,
-        261,
-        436,
-        311,
-        568,
-        946,
-        722,
-        973,
-        386,
-        506,
-        546,
-        991,
-        450,
-        226,
-        889,
-        514,
-        693
-    ]
+@parameterized_class(('sys.modules[\'mmh3\']', ), mmh3_options)
+class ApproximateUniqueTest(unittest.TestCase):
+  """Unit tests for ApproximateUnique.Globally, ApproximateUnique.PerKey,
+  and ApproximateUniqueCombineFn.
+  """
+  random.seed(0)
+  sys.modules['mmh3'] = None

Review comment:
       that was deliberate, but it bothered me as well. done. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 481251)
    Remaining Estimate: 7h 50m  (was: 8h)
            Time Spent: 16h 10m  (was: 16h)

> Hash in stats.ApproximateUniqueCombineFn NON-deterministic
> ----------------------------------------------------------
>
>                 Key: BEAM-10824
>                 URL: https://issues.apache.org/jira/browse/BEAM-10824
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Monica Song
>            Priority: P1
>              Labels: hash
>   Original Estimate: 24h
>          Time Spent: 16h 10m
>  Remaining Estimate: 7h 50m
>
> The python hash() function is non-deterministic. As a result, different 
> workers will map identical values to different hashes. This leads to 
> overestimation of the number of unique values (by several magnitudes, in my 
> experience x1000) in a distributed processing model. 
> [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/stats.py#L218]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to