[
https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=442864&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-442864
]
ASF GitHub Bot logged work on BEAM-2939:
----------------------------------------
Author: ASF GitHub Bot
Created on: 09/Jun/20 16:06
Start Date: 09/Jun/20 16:06
Worklog Time Spent: 10m
Work Description: boyuanzz commented on a change in pull request #11941:
URL: https://github.com/apache/beam/pull/11941#discussion_r436861894
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -193,7 +193,21 @@
bundleFinalizer);
// Register the appropriate handlers.
- startFunctionRegistry.register(pTransformId, runner::startBundle);
+ switch (pTransform.getSpec().getUrn()) {
+ case PTransformTranslation.PAR_DO_TRANSFORM_URN:
+ case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN:
+ case
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
+ startFunctionRegistry.register(pTransformId, runner::startBundle);
+ break;
+ case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:
Review comment:
I'm thinking whether it would be helpful to have `startBundle` for
`PairWithRestriction` and `SplitRestriction`. Similar to process fn,
`PairWithRestriction` and `SplitRestriction` also deal with (element,
restriction). For example, I do this in `KafkaIO` to initialize consumer per
bundle instead of per element.
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -784,9 +812,8 @@ private ByteString encodeProgress(double value) throws
IOException {
default:
// no-op
}
- }
- private void startBundle() {
+ // TODO: Support caching state data across bundle boundaries.
Review comment:
Could you please add a JIRA here, which elaborate this support with more
details?
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -193,7 +193,21 @@
bundleFinalizer);
// Register the appropriate handlers.
- startFunctionRegistry.register(pTransformId, runner::startBundle);
+ switch (pTransform.getSpec().getUrn()) {
+ case PTransformTranslation.PAR_DO_TRANSFORM_URN:
+ case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN:
+ case
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
+ startFunctionRegistry.register(pTransformId, runner::startBundle);
+ break;
+ case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:
Review comment:
I just got the idea of `setup` /`teardown`. Thanks!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 442864)
Time Spent: 38h 50m (was: 38h 40m)
> Fn API SDF support
> ------------------
>
> Key: BEAM-2939
> URL: https://issues.apache.org/jira/browse/BEAM-2939
> Project: Beam
> Issue Type: Improvement
> Components: beam-model
> Reporter: Henning Rohde
> Assignee: Luke Cwik
> Priority: P2
> Labels: portability, stale-assigned
> Time Spent: 38h 50m
> Remaining Estimate: 0h
>
> The Fn API should support streaming SDF. Detailed design TBD.
> Once design is ready, expand subtasks similarly to BEAM-2822.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)