[
https://issues.apache.org/jira/browse/BEAM-13193?focusedWorklogId=710745&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-710745
]
ASF GitHub Bot logged work on BEAM-13193:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 18/Jan/22 18:41
Start Date: 18/Jan/22 18:41
Worklog Time Spent: 10m
Work Description: y1chi commented on a change in pull request #16439:
URL: https://github.com/apache/beam/pull/16439#discussion_r787048391
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
##########
@@ -72,87 +69,67 @@
public BeamFnDataWriteRunner<InputT> createRunnerForPTransform(Context
context)
throws IOException {
- BeamFnDataWriteRunner<InputT> runner =
- new BeamFnDataWriteRunner<>(
- context.getBundleCacheSupplier(),
- context.getPTransformId(),
- context.getPTransform(),
- context.getProcessBundleInstructionIdSupplier(),
- context.getCoders(),
- context.getBeamFnDataClient(),
- context.getBeamFnStateClient());
- context.addStartBundleFunction(runner::registerForOutput);
+ RemoteGrpcPort port =
RemoteGrpcPortWrite.fromPTransform(context.getPTransform()).getPort();
+ RehydratedComponents components =
+ RehydratedComponents.forComponents(
+
Components.newBuilder().putAllCoders(context.getCoders()).build());
+ Coder<WindowedValue<InputT>> coder =
+ (Coder<WindowedValue<InputT>>)
+ CoderTranslation.fromProto(
+ context.getCoders().get(port.getCoderId()),
+ components,
+ new StateBackedIterableTranslationContext() {
+ @Override
+ public Supplier<Cache<?, ?>> getCache() {
+ return context.getBundleCacheSupplier();
+ }
+
+ @Override
+ public BeamFnStateClient getStateClient() {
+ return context.getBeamFnStateClient();
+ }
+
+ @Override
+ public Supplier<String> getCurrentInstructionId() {
+ return context.getProcessBundleInstructionIdSupplier();
+ }
+ });
+ BeamFnDataOutboundAggregator outboundAggregator =
+ context
+ .getOutboundAggregators()
+ .computeIfAbsent(
+ port.getApiServiceDescriptor(),
+ apiServiceDescriptor ->
+
context.getBeamFnDataClient().createOutboundAggregator(apiServiceDescriptor));
+ Supplier<LogicalEndpoint> endpointSupplier =
+ () ->
+ LogicalEndpoint.data(
+ context.getProcessBundleInstructionIdSupplier().get(),
context.getPTransformId());
+ BeamFnDataWriteRunner<InputT> runner = new
BeamFnDataWriteRunner<>(outboundAggregator);
+ context.addStartBundleFunction(() ->
runner.registerForOutput(endpointSupplier, coder));
+
context.addPCollectionConsumer(
getOnlyElement(context.getPTransform().getInputsMap().values()),
(FnDataReceiver) (FnDataReceiver<WindowedValue<InputT>>)
runner::consume,
- ((WindowedValueCoder<InputT>) runner.coder).getValueCoder());
+ ((WindowedValueCoder<InputT>) coder).getValueCoder());
- context.addFinishBundleFunction(runner::close);
return runner;
}
}
- private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
- private final String pTransformId;
- private final Coder<WindowedValue<InputT>> coder;
- private final BeamFnDataClient beamFnDataClientFactory;
- private final Supplier<String> processBundleInstructionIdSupplier;
-
- private CloseableFnDataReceiver<WindowedValue<InputT>> consumer;
-
- BeamFnDataWriteRunner(
- Supplier<Cache<?, ?>> cache,
- String pTransformId,
- RunnerApi.PTransform remoteWriteNode,
- Supplier<String> processBundleInstructionIdSupplier,
- Map<String, RunnerApi.Coder> coders,
- BeamFnDataClient beamFnDataClientFactory,
- BeamFnStateClient beamFnStateClient)
- throws IOException {
- this.pTransformId = pTransformId;
- RemoteGrpcPort port =
RemoteGrpcPortWrite.fromPTransform(remoteWriteNode).getPort();
- this.apiServiceDescriptor = port.getApiServiceDescriptor();
- this.beamFnDataClientFactory = beamFnDataClientFactory;
- this.processBundleInstructionIdSupplier =
processBundleInstructionIdSupplier;
-
- RehydratedComponents components =
-
RehydratedComponents.forComponents(Components.newBuilder().putAllCoders(coders).build());
- this.coder =
- (Coder<WindowedValue<InputT>>)
- CoderTranslation.fromProto(
- coders.get(port.getCoderId()),
- components,
- new StateBackedIterableTranslationContext() {
- @Override
- public Supplier<Cache<?, ?>> getCache() {
- return cache;
- }
-
- @Override
- public BeamFnStateClient getStateClient() {
- return beamFnStateClient;
- }
-
- @Override
- public Supplier<String> getCurrentInstructionId() {
- return processBundleInstructionIdSupplier;
- }
- });
- }
+ private final BeamFnDataOutboundAggregator outboundAggregator;
+ private LogicalEndpoint endpoint;
- public void registerForOutput() {
- consumer =
- beamFnDataClientFactory.send(
- apiServiceDescriptor,
- LogicalEndpoint.data(processBundleInstructionIdSupplier.get(),
pTransformId),
- coder);
+ BeamFnDataWriteRunner(BeamFnDataOutboundAggregator outboundAggregator)
throws IOException {
+ this.outboundAggregator = outboundAggregator;
}
- public void close() throws Exception {
- consumer.close();
+ public void registerForOutput(Supplier<LogicalEndpoint> outputLocation,
Coder<?> coder) {
+ endpoint = outputLocation.get();
Review comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 710745)
Time Spent: 11h 20m (was: 11h 10m)
> Optional data embedding in Fn API control request and response
> --------------------------------------------------------------
>
> Key: BEAM-13193
> URL: https://issues.apache.org/jira/browse/BEAM-13193
> Project: Beam
> Issue Type: New Feature
> Components: sdk-java-harness, sdk-py-harness
> Reporter: Yichi Zhang
> Priority: P2
> Time Spent: 11h 20m
> Remaining Estimate: 0h
>
> https://docs.google.com/document/d/14p8Y_n4IY5n9L_I9l5x9lVGgml4ZzdCw645HldndCrw/edit
--
This message was sent by Atlassian Jira
(v8.20.1#820001)