Brian Hulette created BEAM-11393:
------------------------------------
Summary: Support grouping by a Series
Key: BEAM-11393
URL: https://issues.apache.org/jira/browse/BEAM-11393
Project: Beam
Issue Type: Improvement
Components: sdk-py-core
Reporter: Brian Hulette
grouping by a Series (e.g. \{{df.groupby(df.column)}},
\{{series.groupby(other_series)}}) does not work. The previous implementation
relied on aligning the index between the two deferred frames, but it's possible
that one or both frames will have duplicate values in their index. Leading to
the following error at execution time:
{code}
Traceback (most recent call last):
File
"/usr/local/google/home/bhulette/working_dir/beam/sdks/python/apache_beam/dataframe/doctests.py",
line 237, in fix
computed = self.compute(to_compute)
File
"/usr/local/google/home/bhulette/working_dir/beam/sdks/python/apache_beam/dataframe/doctests.py",
line 195, in compute_using_session
return {
File
"/usr/local/google/home/bhulette/working_dir/beam/sdks/python/apache_beam/dataframe/doctests.py",
line 196, in <dictcomp>
name: frame._expr.evaluate_at(session)
File
"/usr/local/google/home/bhulette/working_dir/beam/sdks/python/apache_beam/dataframe/expressions.py",
line 329, in evaluate_at
return self._func(*(session.evaluate(arg) for arg in self._args))
File
"/usr/local/google/home/bhulette/working_dir/beam/sdks/python/apache_beam/dataframe/expressions.py",
line 329, in <genexpr>
return self._func(*(session.evaluate(arg) for arg in self._args))
File
"/usr/local/google/home/bhulette/working_dir/beam/sdks/python/apache_beam/dataframe/expressions.py",
line 144, in evaluate
result = evaluate_with(input_partitioning)
File
"/usr/local/google/home/bhulette/working_dir/beam/sdks/python/apache_beam/dataframe/expressions.py",
line 114, in evaluate_with
results.append(session.evaluate(expr))
File
"/usr/local/google/home/bhulette/working_dir/beam/sdks/python/apache_beam/dataframe/expressions.py",
line 42, in evaluate
self._bindings[expr] = expr.evaluate_at(self)
File
"/usr/local/google/home/bhulette/working_dir/beam/sdks/python/apache_beam/dataframe/expressions.py",
line 329, in evaluate_at
return self._func(*(session.evaluate(arg) for arg in self._args))
File
"/usr/local/google/home/bhulette/working_dir/beam/sdks/python/apache_beam/dataframe/frames.py",
line 149, in set_index
df, by = df.align(by, axis=0, join='inner')
File
"/usr/local/google/home/bhulette/.pyenv/versions/beam/lib/python3.8/site-packages/pandas/core/frame.py",
line 3962, in align
return super().align(
File
"/usr/local/google/home/bhulette/.pyenv/versions/beam/lib/python3.8/site-packages/pandas/core/generic.py",
line 8559, in align
return self._align_series(
File
"/usr/local/google/home/bhulette/.pyenv/versions/beam/lib/python3.8/site-packages/pandas/core/generic.py",
line 8681, in _align_series
fdata = fdata.reindex_indexer(join_index, lidx, axis=1)
File
"/usr/local/google/home/bhulette/.pyenv/versions/beam/lib/python3.8/site-packages/pandas/core/internals/managers.py",
line 1276, in reindex_indexer
self.axes[axis]._can_reindex(indexer)
File
"/usr/local/google/home/bhulette/.pyenv/versions/beam/lib/python3.8/site-packages/pandas/core/indexes/base.py",
line 3289, in _can_reindex
raise ValueError("cannot reindex from a
duplicate axis")
ValueError: cannot reindex from a duplicate axis
{code}
Discovered in https://github.com/apache/beam/pull/13401, GHA run:
https://github.com/apache/beam/runs/1445605501
--
This message was sent by Atlassian Jira
(v8.3.4#803005)