[ 
https://issues.apache.org/jira/browse/BEAM-6694?focusedWorklogId=287693&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-287693
 ]

ASF GitHub Bot logged work on BEAM-6694:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Aug/19 11:22
            Start Date: 02/Aug/19 11:22
    Worklog Time Spent: 10m 
      Work Description: mszb commented on pull request #9153: [BEAM-6694] Added 
Approximate Quantile Transfrom on Python SDK
URL: https://github.com/apache/beam/pull/9153#discussion_r310089542
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/stats_test.py
 ##########
 @@ -345,5 +353,313 @@ def 
test_approximate_unique_globally_by_error_with_skewed_data(self):
     pipeline.run()
 
 
+class ApproximateQuantilesTest(unittest.TestCase):
+  _kv_data = [("a", 1), ("a", 2), ("a", 3), ("b", 1), ("b", 10), ("b", 10),
+              ("b", 100)]
+  _kv_str_data = [("a", "a"), ("a", "a"*2), ("a", "a"*3), ("b", "b"),
+                  ("b", "b"*10), ("b", "b"*10), ("b", "b"*100)]
+
+  @staticmethod
+  def _quantiles_matcher(expected):
+    l = len(expected)
+
+    def assert_true(exp):
+      if not exp:
+        raise BeamAssertException('%s Failed assert True' % repr(exp))
+
+    def match(actual):
+      actual = actual[0]
+      for i in range(l):
+        if isinstance(expected[i], list):
+          assert_true(expected[i][0] <= actual[i] <= expected[i][1])
+        else:
+          equal_to([expected[i]])([actual[i]])
+
+    return match
+
+  @staticmethod
+  def _approx_quantile_generator(size, num_of_quantiles, absoluteError):
+    quantiles = [0]
+    k = 1
+    while k < num_of_quantiles - 1:
+      expected = (size - 1) * k / (num_of_quantiles - 1)
+      quantiles.append([expected - absoluteError, expected + absoluteError])
+      k = k + 1
+    quantiles.append(size - 1)
+    return quantiles
+
+  def test_quantiles_globaly(self):
+    with TestPipeline() as p:
+      pc = p | Create(range(101))
+      quantiles = pc | beam.ApproximateQuantiles.Globally(5)
+      assert_that(quantiles, equal_to([[0, 25, 50, 75, 100]]))
+
+  def test_quantiles_globaly_comparable(self):
+    with TestPipeline() as p:
+      data = range(101)
+      comparator = lambda a, b: b - a  # descending comparator
+      pc = p | Create(data)
+      quantiles = pc | beam.ApproximateQuantiles.Globally(5, comparator)
+      assert_that(quantiles, equal_to([[100, 75, 50, 25, 0]]))
+
+    with TestPipeline() as p:
+      data = range(101)
+      comparator = lambda a, b: b - a  # descending comparator
+      pc = p | Create(data)
+      quantiles = pc | beam.ApproximateQuantiles.Globally(5, comparator,
+                                                          reverse=True)
+      assert_that(quantiles, equal_to([[0, 25, 50, 75, 100]]))
+
+    with TestPipeline() as p:
+      data = range(101)
+      pc = p | Create(data)
+      quantiles = pc | beam.ApproximateQuantiles.Globally(5, reverse=True)
+      assert_that(quantiles, equal_to([[100, 75, 50, 25, 0]]))
+
+  def test_quantiles_per_key(self):
+    with TestPipeline() as p:
+      data = self._kv_data
+      pc = p | Create(data)
+      quantiles = pc | beam.ApproximateQuantiles.PerKey(2)
+      assert_that(quantiles, equal_to([('a', [1, 3]), ('b', [1, 100])]))
+
+  def test_quantiles_per_key_descending_order(self):
+    with TestPipeline() as p:
+      data = self._kv_data
+      comparator = lambda a, b: b - a  # descending comparator
+      pc = p | Create(data)
+      quantiles = pc | beam.ApproximateQuantiles.PerKey(2, comparator)
+      assert_that(quantiles, equal_to([('a', [3, 1]), ('b', [100, 1])]))
+
+    with TestPipeline() as p:
+      data = self._kv_data
+      comparator = lambda a, b: b - a  # descending comparator
+      pc = p | Create(data)
+      quantiles = pc | beam.ApproximateQuantiles.PerKey(2, comparator,
+                                                        reverse=True)
+      assert_that(quantiles, equal_to([('a', [1, 3]), ('b', [1, 100])]))
+
+    with TestPipeline() as p:
+      data = self._kv_data
+      pc = p | Create(data)
+      quantiles = pc | beam.ApproximateQuantiles.PerKey(2, reverse=True)
+      assert_that(quantiles, equal_to([('a', [3, 1]), ('b', [100, 1])]))
+
+  def test_quantiles_per_key_with_key_argument(self):
+    with TestPipeline() as p:
+      data = self._kv_str_data
+      pc = p | Create(data)
+      quantiles = pc | beam.ApproximateQuantiles.PerKey(2, key=len)
+      assert_that(quantiles, equal_to([('a', ['a', 'a' * 3]),
+                                       ('b', ['b', 'b' * 100])]))
+
+    with TestPipeline() as p:
+      data = self._kv_str_data
+      pc = p | Create(data)
+      quantiles = pc | beam.ApproximateQuantiles.PerKey(2, key=len,
+                                                        reverse=True)
+      assert_that(quantiles, equal_to([('a', ['a'*3, 'a']),
+                                       ('b', ['b'*100, 'b'])]))
+
+  def test_singleton(self):
+    with TestPipeline() as p:
+      data = [389]
+      pc = p | Create(data)
+      qunatiles = pc | beam.ApproximateQuantiles.Globally(5)
+      assert_that(qunatiles, equal_to([[389, 389, 389, 389, 389]]))
+
+  def test_uneven_quantiles(self):
+    with TestPipeline() as p:
+      data = range(5000)
+      pc = p | Create(data)
+      qunatiles = pc | beam.ApproximateQuantiles.Globally(37)
+      aprox_quantiles = self._approx_quantile_generator(size=5000,
+                                                        num_of_quantiles=37,
+                                                        absoluteError=20)
+      assert_that(qunatiles, self._quantiles_matcher(aprox_quantiles))
+
+  def test_large_quantiles(self):
+    with TestPipeline() as p:
+      data = range(10001)
+      pc = p | Create(data)
+      qunatiles = pc | beam.ApproximateQuantiles.Globally(50)
+      aprox_quantiles = self._approx_quantile_generator(size=10001,
+                                                        num_of_quantiles=50,
+                                                        absoluteError=20)
+      assert_that(qunatiles, self._quantiles_matcher(aprox_quantiles))
+
+  def test_random_combines(self):
+    with TestPipeline() as p:
+      data = list(range(101))
+      random.shuffle(data)
+      pc = p | Create(data)
+      quantiles = pc | beam.ApproximateQuantiles.Globally(5)
+      assert_that(quantiles, equal_to([[0, 25, 50, 75, 100]]))
+
+  def test_duplicats(self):
+    with TestPipeline() as p:
+      y = list(range(101))
+      data = []
+      for _ in range(10):
+        data.extend(y)
+      pc = p | Create(data)
+      quantiles = pc | beam.ApproximateQuantiles.Globally(5)
+      assert_that(quantiles, equal_to([[0, 25, 50, 75, 100]]))
+
+  def test_lots_of_duplicats(self):
+    with TestPipeline() as p:
+      data = [1]
+      data.extend([2 for _ in range(299)])
+      data.extend([3 for _ in range(799)])
+      pc = p | Create(data)
+      quantiles = pc | beam.ApproximateQuantiles.Globally(5)
+      assert_that(quantiles, equal_to([[1, 2, 3, 3, 3]]))
+
+  def test_log_distribution(self):
+    with TestPipeline() as p:
+      data = [int(math.log(x)) for x in range(1, 1000)]
+      pc = p | Create(data)
+      quantiles = pc | beam.ApproximateQuantiles.Globally(5)
+      assert_that(quantiles, equal_to([[0, 5, 6, 6, 6]]))
+
+  def test_zipfian_distribution(self):
+    with TestPipeline() as p:
+      data = []
+      for i in range(1, 1000):
+        data.append(int(1000 / i))
+      pc = p | Create(data)
+      quantiles = pc | beam.ApproximateQuantiles.Globally(5)
+      assert_that(quantiles, equal_to([[1, 1, 2, 4, 1000]]))
+
+  def test_alternate_comparator(self):
+    data = ["aa", "aaa", "aaaa", "b", "ccccc", "dddd", "zz"]
+    with TestPipeline() as p:
+      pc = p | Create(data)
+      quantiles = pc | beam.ApproximateQuantiles.Globally(3)
+      assert_that(quantiles, equal_to([["aa", "b", "zz"]]))
+
+    with TestPipeline() as p:
+      pc = p | Create(data)
+      comparator = lambda a, b: len(a) - len(b)  # order by length
+      quantiles = pc | beam.ApproximateQuantiles.Globally(3, comparator)
+      assert_that(quantiles, equal_to([["b", "aaa", "ccccc"]]))
+
+    with TestPipeline() as p:
+      pc = p | Create(data)
+      comparator = lambda a, b: len(a) - len(b)  # order by length
+      quantiles = pc | beam.ApproximateQuantiles.Globally(3, comparator,
+                                                          reverse=True)
+      assert_that(quantiles, equal_to([["ccccc", "aaa", "b"]]))
+
+    with TestPipeline() as p:
+      pc = p | Create(data)
+      quantiles = pc | beam.ApproximateQuantiles.Globally(3, key=len)
+      assert_that(quantiles, equal_to([["b", "aaa", "ccccc"]]))
+
+    with TestPipeline() as p:
+      pc = p | Create(data)
+      quantiles = pc | beam.ApproximateQuantiles.Globally(3, key=len,
+                                                          reverse=True)
+      assert_that(quantiles, equal_to([["ccccc", "aaa", "b"]]))
+
+  def test_global_display_data(self):
+    comparator = lambda a, b: a - b  # order by length
+    aq = beam.ApproximateQuantiles.Globally(3, comparator, key=len,
+                                            reverse=True)
+    data = DisplayData.create_from(aq)
+    expected_items = [
+        DisplayDataItemMatcher('num_quantiles', aq._num_quantiles),
+        DisplayDataItemMatcher('compare', aq._compare.__class__),
+        DisplayDataItemMatcher('key', aq._key.__class__),
+        DisplayDataItemMatcher('reverse', aq._reverse.__class__)
+    ]
+    hc.assert_that(data.items, hc.contains_inanyorder(*expected_items))
+
+  def test_perkey_display_data(self):
+    comparator = lambda a, b: a - b  # order by length
+    aq = beam.ApproximateQuantiles.PerKey(3, comparator, key=len, reverse=True)
+    data = DisplayData.create_from(aq)
+    expected_items = [
+        DisplayDataItemMatcher('num_quantiles', aq._num_quantiles),
+        DisplayDataItemMatcher('compare', aq._compare.__class__),
+        DisplayDataItemMatcher('key', aq._key.__class__),
+        DisplayDataItemMatcher('reverse', aq._reverse.__class__)
+    ]
+    hc.assert_that(data.items, hc.contains_inanyorder(*expected_items))
+
+
+def _build_quantilebuffer_test_data():
 
 Review comment:
   I didn't get your point which says, _"that does not require a new **test 
data file**"_  & ... _"**256-512**"_.
   
   Can you please elaborate?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 287693)
    Time Spent: 7h 50m  (was: 7h 40m)

> ApproximateQuantiles transform for Python SDK
> ---------------------------------------------
>
>                 Key: BEAM-6694
>                 URL: https://issues.apache.org/jira/browse/BEAM-6694
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Ahmet Altay
>            Assignee: Shehzaad Nakhoda
>            Priority: Minor
>          Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> Add PTransforms for getting an idea of a PCollection's data distribution 
> using approximate N-tiles (e.g. quartiles, percentiles, etc.), either 
> globally or per-key.
> It should offer the same API as its Java counterpart: 
> https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to