This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f504c5309bc Revert "[Python] Add take(n) convenience for PCollection 
(#37470)" (#37493)
f504c5309bc is described below

commit f504c5309bcfe95ab99035d9d71c9b99f1ca2e43
Author: tvalentyn <[email protected]>
AuthorDate: Fri Feb 6 06:02:36 2026 -0800

    Revert "[Python] Add take(n) convenience for PCollection (#37470)" (#37493)
    
    This reverts commit fd2babbcddba02e3e0a8712d12980da9578d7b7e.
---
 CHANGES.md                                      |  1 -
 sdks/python/apache_beam/pvalue.py               | 19 -------
 sdks/python/apache_beam/transforms/util.py      | 74 +------------------------
 sdks/python/apache_beam/transforms/util_test.py | 39 -------------
 4 files changed, 1 insertion(+), 132 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index f4a04320d66..ed92cf58050 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -70,7 +70,6 @@
 ## New Features / Improvements
 
 * (Python) Added exception chaining to preserve error context in 
CloudSQLEnrichmentHandler, processes utilities, and core transforms 
([#37422](https://github.com/apache/beam/issues/37422)).
-* (Python) Added `take(n)` convenience for PCollection: `beam.take(n)` and 
`pcoll.take(n)` to get the first N elements deterministically without Top.Of + 
FlatMap ([#X](https://github.com/apache/beam/issues/37429)).
 * X feature added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
 
 ## Breaking Changes
diff --git a/sdks/python/apache_beam/pvalue.py 
b/sdks/python/apache_beam/pvalue.py
index d09a0040bd7..1cd220cc256 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -176,25 +176,6 @@ class PCollection(PValue, Generic[T]):
       is_bounded = pcoll.is_bounded
     return PCollection(pcoll.pipeline, is_bounded=is_bounded)
 
-  def take(self, n: int) -> 'PCollection[T]':
-    """Takes the first N elements from this PCollection.
-
-    This is a convenience method that returns a new PCollection containing
-    at most N elements from this PCollection. The elements are taken
-    deterministically (not randomly sampled).
-
-    Args:
-      n: Number of elements to take. Must be a positive integer.
-
-    Returns:
-      A new PCollection containing at most N elements.
-
-    Example::
-      first_10 = pcoll.take(10)
-    """
-    from apache_beam.transforms import util
-    return self | util.take(n)
-
   def to_runner_api(
       self, context: 'PipelineContext') -> beam_runner_api_pb2.PCollection:
     return beam_runner_api_pb2.PCollection(
diff --git a/sdks/python/apache_beam/transforms/util.py 
b/sdks/python/apache_beam/transforms/util.py
index dd14bd8f57b..fbaab6b4ebb 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -54,7 +54,6 @@ from apache_beam.pvalue import AsSideInput
 from apache_beam.pvalue import PCollection
 from apache_beam.transforms import window
 from apache_beam.transforms.combiners import CountCombineFn
-from apache_beam.transforms.combiners import Top
 from apache_beam.transforms.core import CombinePerKey
 from apache_beam.transforms.core import Create
 from apache_beam.transforms.core import DoFn
@@ -106,13 +105,11 @@ __all__ = [
     'Reshuffle',
     'Secret',
     'ToString',
-    'Take',
     'Tee',
     'Values',
     'WithKeys',
     'GroupIntoBatches',
-    'WaitOn',
-    'take',
+    'WaitOn'
 ]
 
 K = TypeVar('K')
@@ -1970,75 +1967,6 @@ class LogElements(PTransform):
         ))
 
 
[email protected]_input_types(T)
[email protected]_output_types(T)
-class Take(PTransform):
-  """Takes the first N elements from a PCollection.
-
-  This transform returns a PCollection containing at most N elements from the
-  input PCollection. The elements are taken deterministically (not randomly
-  sampled).
-
-  Args:
-    n: Number of elements to take. Must be a positive integer.
-
-  Returns:
-    A PCollection containing at most N elements.
-
-  Example::
-    # Take first 10 elements
-    first_10 = pcoll | beam.take(10)
-
-    # Or as a method
-    first_10 = pcoll.take(10)
-  """
-  def __init__(self, n):
-    """Initializes Take transform.
-
-    Args:
-      n: Number of elements to take. Must be positive.
-    """
-    if n <= 0:
-      raise ValueError('n must be positive, got %d' % n)
-    self._n = n
-
-  def expand(self, pcoll):
-    """Expands the Take transform.
-
-    Args:
-      pcoll: Input PCollection.
-
-    Returns:
-      A PCollection containing at most N elements.
-    """
-    # Use Top.Of with a constant key to get first N elements deterministically.
-    # Top.Of returns a list, so we flatten it to get individual elements.
-    return (
-        pcoll
-        | Top.Of(self._n, key=lambda x: 0).without_defaults()
-        | FlatMap(lambda elements: elements))
-
-  def default_label(self):
-    return 'Take(%d)' % self._n
-
-
-def take(n):
-  """Convenience function for Take transform.
-
-  Takes the first N elements from a PCollection.
-
-  Args:
-    n: Number of elements to take. Must be positive.
-
-  Returns:
-    A Take transform instance.
-
-  Example::
-    first_10 = pcoll | beam.take(10)
-  """
-  return Take(n)
-
-
 class Reify(object):
   """PTransforms for converting between explicit and implicit form of various
   Beam values."""
diff --git a/sdks/python/apache_beam/transforms/util_test.py 
b/sdks/python/apache_beam/transforms/util_test.py
index 448ba8a7ad9..7389568691c 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -1934,45 +1934,6 @@ class ToStringTest(unittest.TestCase):
       assert_that(result, equal_to(["one1", "two2"]))
 
 
-class TakeTest(unittest.TestCase):
-  def test_take_function_syntax(self):
-    with TestPipeline() as p:
-      result = p | beam.Create([1, 2, 3, 4, 5]) | util.take(3)
-      assert_that(result, equal_to([1, 2, 3]))
-
-  def test_take_method_syntax(self):
-    with TestPipeline() as p:
-      pcoll = p | beam.Create([10, 20, 30, 40, 50])
-      result = pcoll.take(2)
-      assert_that(result, equal_to([10, 20]))
-
-  def test_take_more_than_available(self):
-    with TestPipeline() as p:
-      result = p | beam.Create([1, 2, 3]) | util.take(10)
-      assert_that(result, equal_to([1, 2, 3]))
-
-  def test_take_single_element(self):
-    with TestPipeline() as p:
-      result = p | beam.Create([100, 200, 300]) | util.take(1)
-      assert_that(result, equal_to([100]))
-
-  def test_take_all_elements(self):
-    with TestPipeline() as p:
-      data = [1, 2, 3, 4, 5]
-      result = p | beam.Create(data) | util.take(len(data))
-      assert_that(result, equal_to(data))
-
-  def test_take_invalid_n_zero(self):
-    with self.assertRaises(ValueError) as ctx:
-      util.Take(0)
-    self.assertIn('n must be positive', str(ctx.exception))
-
-  def test_take_invalid_n_negative(self):
-    with self.assertRaises(ValueError) as ctx:
-      util.Take(-1)
-    self.assertIn('n must be positive', str(ctx.exception))
-
-
 class LogElementsTest(unittest.TestCase):
   @pytest.fixture(scope="function")
   def _capture_stdout_log(request, capsys):

Reply via email to