[
https://issues.apache.org/jira/browse/BEAM-4654?focusedWorklogId=120101&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-120101
]
ASF GitHub Bot logged work on BEAM-4654:
----------------------------------------
Author: ASF GitHub Bot
Created on: 06/Jul/18 22:13
Start Date: 06/Jul/18 22:13
Worklog Time Spent: 10m
Work Description: angoenka commented on a change in pull request #5883:
[BEAM-4654] Treat timers as PCollections within proto representation in the
Java SDK.
URL: https://github.com/apache/beam/pull/5883#discussion_r200781061
##########
File path:
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
##########
@@ -88,39 +91,72 @@
"urn:beam:windowmappingfn:javasdk:0.1";
/** A {@link TransformPayloadTranslator} for {@link ParDo}. */
- public static class ParDoPayloadTranslator
- implements TransformPayloadTranslator<MultiOutput<?, ?>> {
- public static TransformPayloadTranslator create() {
- return new ParDoPayloadTranslator();
+ public static class ParDoTranslator implements
TransformTranslator<MultiOutput<?, ?>> {
+
+ public static TransformTranslator create() {
+ return new ParDoTranslator();
}
- private ParDoPayloadTranslator() {}
+ private ParDoTranslator() {}
@Override
public String getUrn(ParDo.MultiOutput<?, ?> transform) {
return PAR_DO_TRANSFORM_URN;
}
@Override
- public FunctionSpec translate(
- AppliedPTransform<?, ?, MultiOutput<?, ?>> transform, SdkComponents
components)
- throws IOException {
- ParDoPayload payload =
- translateParDo(transform.getTransform(), transform.getPipeline(),
components);
- return RunnerApi.FunctionSpec.newBuilder()
- .setUrn(PAR_DO_TRANSFORM_URN)
- .setPayload(payload.toByteString())
- .build();
+ public boolean canTranslate(PTransform<?, ?> pTransform) {
+ return pTransform instanceof ParDo.MultiOutput;
}
- /** Registers {@link ParDoPayloadTranslator}. */
- @AutoService(TransformPayloadTranslatorRegistrar.class)
- public static class Registrar implements
TransformPayloadTranslatorRegistrar {
- @Override
- public Map<? extends Class<? extends PTransform>, ? extends
TransformPayloadTranslator>
- getTransformPayloadTranslators() {
- return Collections.singletonMap(ParDo.MultiOutput.class, new
ParDoPayloadTranslator());
+ @Override
+ public RunnerApi.PTransform translate(
+ AppliedPTransform<?, ?, ?> appliedPTransform,
+ List<AppliedPTransform<?, ?, ?>> subtransforms,
+ SdkComponents components)
+ throws IOException {
+ RunnerApi.PTransform.Builder builder =
+ PTransformTranslation.translateAppliedPTransform(
+ appliedPTransform, subtransforms, components);
+
+ ParDoPayload payload =
+ translateParDo(
+ (ParDo.MultiOutput) appliedPTransform.getTransform(),
+ appliedPTransform.getPipeline(),
+ components);
+ builder.setSpec(
+ RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(PAR_DO_TRANSFORM_URN)
+ .setPayload(payload.toByteString())
+ .build());
+
+ String mainInputId = getMainInputId(builder, payload);
+ PCollection<KV<?, ?>> mainInput =
+ (PCollection) appliedPTransform.getInputs().get(new
TupleTag(mainInputId));
+
+ // https://s.apache.org/beam-portability-timers
+ // Add a PCollection and coder for each timer. Also treat them as inputs
and outputs.
+ for (String localTimerName : payload.getTimerSpecsMap().keySet()) {
+ PCollection<?> timerPCollection =
Review comment:
How is TimerSpec encoded in this PCollectoon?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 120101)
Time Spent: 50m (was: 40m)
> Update pipeline translation for timers inside Java SDK
> ------------------------------------------------------
>
> Key: BEAM-4654
> URL: https://issues.apache.org/jira/browse/BEAM-4654
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-java-core
> Reporter: Luke Cwik
> Assignee: Luke Cwik
> Priority: Major
> Labels: portability
> Time Spent: 50m
> Remaining Estimate: 0h
>
> Add the timer PCollection and treat timers as inputs/outputs of the ParDo
> PTransform.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)