y1chi commented on a change in pull request #16439:
URL: https://github.com/apache/beam/pull/16439#discussion_r787048391
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
##########
@@ -72,87 +69,67 @@
public BeamFnDataWriteRunner<InputT> createRunnerForPTransform(Context
context)
throws IOException {
- BeamFnDataWriteRunner<InputT> runner =
- new BeamFnDataWriteRunner<>(
- context.getBundleCacheSupplier(),
- context.getPTransformId(),
- context.getPTransform(),
- context.getProcessBundleInstructionIdSupplier(),
- context.getCoders(),
- context.getBeamFnDataClient(),
- context.getBeamFnStateClient());
- context.addStartBundleFunction(runner::registerForOutput);
+ RemoteGrpcPort port =
RemoteGrpcPortWrite.fromPTransform(context.getPTransform()).getPort();
+ RehydratedComponents components =
+ RehydratedComponents.forComponents(
+
Components.newBuilder().putAllCoders(context.getCoders()).build());
+ Coder<WindowedValue<InputT>> coder =
+ (Coder<WindowedValue<InputT>>)
+ CoderTranslation.fromProto(
+ context.getCoders().get(port.getCoderId()),
+ components,
+ new StateBackedIterableTranslationContext() {
+ @Override
+ public Supplier<Cache<?, ?>> getCache() {
+ return context.getBundleCacheSupplier();
+ }
+
+ @Override
+ public BeamFnStateClient getStateClient() {
+ return context.getBeamFnStateClient();
+ }
+
+ @Override
+ public Supplier<String> getCurrentInstructionId() {
+ return context.getProcessBundleInstructionIdSupplier();
+ }
+ });
+ BeamFnDataOutboundAggregator outboundAggregator =
+ context
+ .getOutboundAggregators()
+ .computeIfAbsent(
+ port.getApiServiceDescriptor(),
+ apiServiceDescriptor ->
+
context.getBeamFnDataClient().createOutboundAggregator(apiServiceDescriptor));
+ Supplier<LogicalEndpoint> endpointSupplier =
+ () ->
+ LogicalEndpoint.data(
+ context.getProcessBundleInstructionIdSupplier().get(),
context.getPTransformId());
+ BeamFnDataWriteRunner<InputT> runner = new
BeamFnDataWriteRunner<>(outboundAggregator);
+ context.addStartBundleFunction(() ->
runner.registerForOutput(endpointSupplier, coder));
+
context.addPCollectionConsumer(
getOnlyElement(context.getPTransform().getInputsMap().values()),
(FnDataReceiver) (FnDataReceiver<WindowedValue<InputT>>)
runner::consume,
- ((WindowedValueCoder<InputT>) runner.coder).getValueCoder());
+ ((WindowedValueCoder<InputT>) coder).getValueCoder());
- context.addFinishBundleFunction(runner::close);
return runner;
}
}
- private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
- private final String pTransformId;
- private final Coder<WindowedValue<InputT>> coder;
- private final BeamFnDataClient beamFnDataClientFactory;
- private final Supplier<String> processBundleInstructionIdSupplier;
-
- private CloseableFnDataReceiver<WindowedValue<InputT>> consumer;
-
- BeamFnDataWriteRunner(
- Supplier<Cache<?, ?>> cache,
- String pTransformId,
- RunnerApi.PTransform remoteWriteNode,
- Supplier<String> processBundleInstructionIdSupplier,
- Map<String, RunnerApi.Coder> coders,
- BeamFnDataClient beamFnDataClientFactory,
- BeamFnStateClient beamFnStateClient)
- throws IOException {
- this.pTransformId = pTransformId;
- RemoteGrpcPort port =
RemoteGrpcPortWrite.fromPTransform(remoteWriteNode).getPort();
- this.apiServiceDescriptor = port.getApiServiceDescriptor();
- this.beamFnDataClientFactory = beamFnDataClientFactory;
- this.processBundleInstructionIdSupplier =
processBundleInstructionIdSupplier;
-
- RehydratedComponents components =
-
RehydratedComponents.forComponents(Components.newBuilder().putAllCoders(coders).build());
- this.coder =
- (Coder<WindowedValue<InputT>>)
- CoderTranslation.fromProto(
- coders.get(port.getCoderId()),
- components,
- new StateBackedIterableTranslationContext() {
- @Override
- public Supplier<Cache<?, ?>> getCache() {
- return cache;
- }
-
- @Override
- public BeamFnStateClient getStateClient() {
- return beamFnStateClient;
- }
-
- @Override
- public Supplier<String> getCurrentInstructionId() {
- return processBundleInstructionIdSupplier;
- }
- });
- }
+ private final BeamFnDataOutboundAggregator outboundAggregator;
+ private LogicalEndpoint endpoint;
- public void registerForOutput() {
- consumer =
- beamFnDataClientFactory.send(
- apiServiceDescriptor,
- LogicalEndpoint.data(processBundleInstructionIdSupplier.get(),
pTransformId),
- coder);
+ BeamFnDataWriteRunner(BeamFnDataOutboundAggregator outboundAggregator)
throws IOException {
+ this.outboundAggregator = outboundAggregator;
}
- public void close() throws Exception {
- consumer.close();
+ public void registerForOutput(Supplier<LogicalEndpoint> outputLocation,
Coder<?> coder) {
+ endpoint = outputLocation.get();
Review comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]