[
https://issues.apache.org/jira/browse/BEAM-3376?focusedWorklogId=499939&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-499939
]
ASF GitHub Bot logged work on BEAM-3376:
----------------------------------------
Author: ASF GitHub Bot
Created on: 13/Oct/20 09:21
Start Date: 13/Oct/20 09:21
Worklog Time Spent: 10m
Work Description: kamilwu commented on a change in pull request #13081:
URL: https://github.com/apache/beam/pull/13081#discussion_r503794860
##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -888,13 +888,32 @@ class CombineFn(WithTypeHints, HasDisplayData,
urns.RunnerApiFn):
5. The extract_output operation is invoked on the final accumulator to get
the output value.
- Note: If this **CombineFn** is used with a transform that has defaults,
- **apply** will be called with an empty list at expansion time to get the
- default value.
"""
def default_label(self):
return self.__class__.__name__
+ def default_value(self, *args, **kwargs):
+ """Returns a default reduction of an empty input.
+
+ Some combiners require a default value when reducing an empty collection,
+ which may be necessary when combining elements in an empty window.
+
+ If **CombineFn** is used with a transform that requires defaults,
+ default_value may be called during transform expansion.
+
+ Args:
+ *args: Additional arguments and side inputs.
+ **kwargs: Additional arguments and side inputs.
+ """
+ # Defalut values may be evaluated at pipeline construction time.
Review comment:
nit: Default
##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -888,13 +888,32 @@ class CombineFn(WithTypeHints, HasDisplayData,
urns.RunnerApiFn):
5. The extract_output operation is invoked on the final accumulator to get
the output value.
- Note: If this **CombineFn** is used with a transform that has defaults,
- **apply** will be called with an empty list at expansion time to get the
- default value.
"""
def default_label(self):
return self.__class__.__name__
+ def default_value(self, *args, **kwargs):
+ """Returns a default reduction of an empty input.
+
+ Some combiners require a default value when reducing an empty collection,
+ which may be necessary when combining elements in an empty window.
+
+ If **CombineFn** is used with a transform that requires defaults,
+ default_value may be called during transform expansion.
+
+ Args:
+ *args: Additional arguments and side inputs.
+ **kwargs: Additional arguments and side inputs.
+ """
+ # Defalut values may be evaluated at pipeline construction time.
+ # Make a copy to avoid passing any side-effects to the serialized pipeline
+ # representaiton.
+ combine_copy = copy.copy(self)
Review comment:
The user can provide a combiner that has nested combiners, e.g.
`TupleCombineFn`, so we should make a deep copy instead.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 499939)
Time Spent: 0.5h (was: 20m)
> Better expose BigQuery quota errors to users
> --------------------------------------------
>
> Key: BEAM-3376
> URL: https://issues.apache.org/jira/browse/BEAM-3376
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Reporter: Chamikara Madhusanka Jayalath
> Priority: P3
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Currently users simply see following error.
> (9e02ab71e036fcc3): Workflow failed. Causes: (9e02ab71e036fbad): S27:Save to
> BigQuery/BatchLoads/MultiPartitionsReshuffle/GroupByKey/Read+Save to
> BigQuery/BatchLoads/MultiPartitionsReshuffle/GroupByKey/GroupByWindow+Save to
> BigQuery/BatchLoads/MultiPartitionsReshuffle/ExpandIterable+Save to
> BigQuery/BatchLoads/MultiPartitionsWriteTables/ParMultiDo(WriteTables)+Save
> to BigQuery/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map+Save
> to
> BigQuery/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign+Save
> to BigQuery/BatchLoads/MultiPartitionsWriteTables/GroupByKey/Reify+Save to
> BigQuery/BatchLoads/MultiPartitionsWriteTables/GroupByKey/Write+Save to
> BigQuery/BatchLoads/ReifyRenameInput/View.AsIterable/View.CreatePCollectionView/ParDo(ToIsmRecordForGlobalWindow)
> failed., (f88fcdac78f4b497): A work item was attempted 4 times without
> success. Each time the worker eventually lost contact with the service. The
> work item was attempted on: ...
> (there might be additional info in worker logs).
> We should update the BigQuery connector to raise an exception with a proper
> error message.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)