pcoet commented on a change in pull request #14863:
URL: https://github.com/apache/beam/pull/14863#discussion_r637930080
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -92,6 +92,24 @@ def wrapper(self, *args, **kwargs):
frame_base.populate_defaults(pd.DataFrame)(wrapper)))
+LIFTABLE_AGGREGATIONS = ['all', 'any', 'max', 'min', 'prod', 'sum']
+LIFTABLE_WITH_SUM_AGGREGATIONS = ['size', 'count']
+UNLIFTABLE_AGGREGATIONS = ['mean', 'median', 'std', 'var']
+
+
+def _agg_method(base, func):
+ def wrapper(self, *args, **kwargs):
+ return self.agg(func, *args, **kwargs)
+
+ if func in UNLIFTABLE_AGGREGATIONS:
+ wrapper.__doc__ = (
+ f"``{func}`` cannot currently be parallelized, it will "
Review comment:
"cannot currently be parallelized, it will" -> "cannot currently be
parallelized. It will"
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -115,10 +115,14 @@ class DeferredDataFrameOrSeries(frame_base.DeferredFrame):
__array__ = frame_base.wont_implement_method(
pd.Series, '__array__', reason="non-deferred-result")
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def drop(self, labels, axis, index, columns, errors, **kwargs):
+ """drop is not parallelizable when dropping from the index and
+ errors="raise" specified. It requires collecting all data on a single node
Review comment:
'errors="raise" specified' -> 'errors="raise" is specified'
##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -14,6 +14,31 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+"""Sources and sinks for the Beam DataFrame API.
+
+Sources
+#######
+This module provides analogs for pandas ``read`` methods, like
+:func:`pandas.read_csv`. However Beam sources like :func:`read_csv`
+create a Beam :class:`~apache_beam.PTransform`, and return a
+:class:`~apache_beam.dataframe.frames.DeferredDataFrame` or
+:class:`~apache_beam.dataframe.frames.DeferredSeries` representing the contents
+of the referenced file(s) or data source.
+
+The result of these methods must be applied to a :class:`~apache_beam.Pipeline`
+object, for example::
+
+ df = p | beam.dataframe.io.read_csv(...)
+
+Sinks
+#####
+This module also defines analogs for pandas sink, or ``to``, methods that
+generate a Beam :class:`~apache_beam.PTransform`. Generally these should be
Review comment:
I wonder what "Generally" means here.
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -173,10 +180,13 @@ def droplevel(self, level, axis):
preserves_partition_by=partitionings.Arbitrary()
if axis in (1, 'column') else partitionings.Singleton()))
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def fillna(self, value, method, axis, limit, **kwargs):
+ """When axis="index", both method and limit must be None,
Review comment:
"None," -> "None."
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -367,9 +384,14 @@ def map_index(df):
'astype', base=pd.core.generic.NDFrame)
copy = frame_base._elementwise_method('copy', base=pd.core.generic.NDFrame)
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def tz_localize(self, ambiguous, **kwargs):
+ """``ambiguous`` may not be set to 'infer' as it's semantics are
Review comment:
"it's" -> "its"
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -367,9 +384,14 @@ def map_index(df):
'astype', base=pd.core.generic.NDFrame)
copy = frame_base._elementwise_method('copy', base=pd.core.generic.NDFrame)
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def tz_localize(self, ambiguous, **kwargs):
+ """``ambiguous`` may not be set to 'infer' as it's semantics are
+ order-sensitive. Similarly specifying ``ambiguous`` as an ndarray is
Review comment:
"Similarly specifying" -> "Similarly, specifying"
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -993,27 +1049,42 @@ def _wrap_in_df(self):
preserves_partition_by=partitionings.Arbitrary(),
))
+ @frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.maybe_inplace
def duplicated(self, keep):
+ """Only keep=False and keep="any" are supported, other values of keep make
+ this an order-sensitive operation. Note keep="any" is a Beam-specific
+ option that guarantees only one duplicate will be kept, but unlike "first"
+ and "last" it makes no guarantees about _which_ duplicate element is
+ kept."""
# Re-use the DataFrame based duplcated, extract the series back out
df = self._wrap_in_df()
return df.duplicated(keep=keep)[df.columns[0]]
+ @frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.maybe_inplace
def drop_duplicates(self, keep):
+ """Only keep=False and keep="any" are supported, other values of keep make
Review comment:
"supported, other" -> "supported. Other"
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -367,9 +384,14 @@ def map_index(df):
'astype', base=pd.core.generic.NDFrame)
copy = frame_base._elementwise_method('copy', base=pd.core.generic.NDFrame)
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def tz_localize(self, ambiguous, **kwargs):
+ """``ambiguous`` may not be set to 'infer' as it's semantics are
+ order-sensitive. Similarly specifying ``ambiguous`` as an ndarray is
+ order-sensitive, but you can achieve similar functionality by specifying
+ ambiguous as a Series."""
Review comment:
ambiguous -> ``ambiguous``
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1163,9 +1235,15 @@ def axes(self):
__contains__ = frame_base.wont_implement_method(
pd.Series, '__contains__', reason="non-deferred-result")
+ @frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
def nlargest(self, keep, **kwargs):
+ """Only keep=False and keep="any" are supported, other values of keep make
Review comment:
"supported, other" -> "supported. Other"
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -993,27 +1049,42 @@ def _wrap_in_df(self):
preserves_partition_by=partitionings.Arbitrary(),
))
+ @frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.maybe_inplace
def duplicated(self, keep):
+ """Only keep=False and keep="any" are supported, other values of keep make
Review comment:
"supported, other" -> "supported. Other"
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1190,9 +1268,15 @@ def nlargest(self, keep, **kwargs):
preserves_partition_by=partitionings.Arbitrary(),
requires_partition_by=partitionings.Singleton()))
+ @frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
def nsmallest(self, keep, **kwargs):
+ """Only keep=False and keep="any" are supported, other values of keep make
Review comment:
"supported, other" -> "supported. Other"
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -2166,9 +2279,15 @@ def merge(
return merged.reset_index(drop=True)
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def nlargest(self, keep, **kwargs):
+ """Only keep=False and keep="any" are supported, other values of keep make
Review comment:
"supported, other values" -> "supported. Other values"
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1290,12 +1380,18 @@ def replace(self, to_replace, value, limit, method,
**kwargs):
to_frame = frame_base._elementwise_method('to_frame', base=pd.Series)
+ @frame_base.with_docs_from(pd.Series)
def unique(self, as_series=False):
+ """unique() is not supported by default because it produces a
+ non-deferred result, a numpy array. You may use the Beam-specific argument
Review comment:
"non-deferred result, a numpy array. You may use the Beam-specific
argument" -> "non-deferred result: a numpy array. You can use the Beam-specific
argument"
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -993,27 +1049,42 @@ def _wrap_in_df(self):
preserves_partition_by=partitionings.Arbitrary(),
))
+ @frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.maybe_inplace
def duplicated(self, keep):
+ """Only keep=False and keep="any" are supported, other values of keep make
+ this an order-sensitive operation. Note keep="any" is a Beam-specific
+ option that guarantees only one duplicate will be kept, but unlike "first"
Review comment:
Well, based on the reStructuredText documentation, I think I'd use
double backticks (``foo``) for arg names and other literals. It's not a big
deal. I'm not going to call it out again. But it could be a useful convention.
I also took a quick look at the Pandas doc strings, and it seems like they're
not very strict or consistent about it. So just a nit.
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1614,10 +1718,16 @@ def duplicated(self, keep, subset):
lambda df: pd.DataFrame(df.duplicated(keep=keep, subset=subset),
columns=[None]))[None]
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def drop_duplicates(self, keep, subset, ignore_index):
+ """Only keep=False and keep="any" are supported, other values of keep make
Review comment:
"supported, other values" -> "supported. Other values"
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -367,9 +384,14 @@ def map_index(df):
'astype', base=pd.core.generic.NDFrame)
copy = frame_base._elementwise_method('copy', base=pd.core.generic.NDFrame)
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def tz_localize(self, ambiguous, **kwargs):
+ """``ambiguous`` may not be set to 'infer' as it's semantics are
Review comment:
"may not" -> "cannot"
As a general rule, "may" is for permission or an uncertain outcome ("You may
find that passing in `foo` improves performance"), and "can" is for
possibility. So if we prevent a user from doing something (i.e. it's not
possible), we usually say, "You cannot do foo..."
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -173,10 +180,13 @@ def droplevel(self, level, axis):
preserves_partition_by=partitionings.Arbitrary()
if axis in (1, 'column') else partitionings.Singleton()))
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def fillna(self, value, method, axis, limit, **kwargs):
+ """When axis="index", both method and limit must be None,
+ otherwise this operation is order-sensitive."""
Review comment:
"otherwise" -> "Otherwise"
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -993,27 +1049,42 @@ def _wrap_in_df(self):
preserves_partition_by=partitionings.Arbitrary(),
))
+ @frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.maybe_inplace
def duplicated(self, keep):
+ """Only keep=False and keep="any" are supported, other values of keep make
+ this an order-sensitive operation. Note keep="any" is a Beam-specific
+ option that guarantees only one duplicate will be kept, but unlike "first"
Review comment:
I'm not sure if there's a convention around double quotes versus
backticks for names in these doc strings. I would probably prefer backticks
(`first`) to quotes. It seems like this is mostly a matter of
[preference](https://stackoverflow.com/questions/22256995/restructuredtext-in-sphinx-and-docstrings-single-vs-double-back-quotes-or-back).
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1592,10 +1690,16 @@ def func_elementwise(df):
self._expr = inserted._expr
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def duplicated(self, keep, subset):
+ """Only keep=False and keep="any" are supported, other values of keep make
Review comment:
"supported, other values" -> "supported. Other values"
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1248,10 +1333,15 @@ def set_index(s):
rename_axis = frame_base._elementwise_method('rename_axis', base=pd.Series)
+ @frame_base.with_docs_from(pd.Series, name='is_unique')
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.maybe_inplace
def replace(self, to_replace, value, limit, method, **kwargs):
+ """`method` is not supported in the Beam DataFrame API because it is
+ order-sensitive, it must not be specified.
Review comment:
"order-sensitive, it must not be specified." -> "order-sensitive. It
cannot be specified."
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -2192,9 +2311,15 @@ def nlargest(self, keep, **kwargs):
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton()))
+ @frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def nsmallest(self, keep, **kwargs):
+ """Only keep=False and keep="any" are supported, other values of keep make
Review comment:
"supported, other values" -> "supported. Other values"
##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1290,12 +1380,18 @@ def replace(self, to_replace, value, limit, method,
**kwargs):
to_frame = frame_base._elementwise_method('to_frame', base=pd.Series)
+ @frame_base.with_docs_from(pd.Series)
def unique(self, as_series=False):
+ """unique() is not supported by default because it produces a
+ non-deferred result, a numpy array. You may use the Beam-specific argument
+ ``unique(as_series=True)`` to get the result as a
:class:`DeferredSeries`"""
+
if not as_series:
raise frame_base.WontImplementError(
"unique() is not supported by default because it produces a "
- "non-deferred result, a numpy array. You may call it with "
- "unique(as_series=True) to get the result as a DeferredSeries",
+ "non-deferred result, a numpy array. You may use the Beam-specific "
Review comment:
"non-deferred result, a numpy array. You may" -> "non-deferred result: a
numpy array. You can"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]