[
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85182&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85182
]
ASF GitHub Bot logged work on BEAM-3326:
----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Mar/18 09:59
Start Date: 28/Mar/18 09:59
Worklog Time Spent: 10m
Work Description: aljoscha commented on a change in pull request #4963:
[BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle
to finish, waiting for outbound to complete within the ActiveBundle.
URL: https://github.com/apache/beam/pull/4963#discussion_r177666906
##########
File path:
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
##########
@@ -221,6 +220,135 @@ public void testNewBundleAndProcessElements() throws
Exception {
WindowedValue.valueInGlobalWindow("eggs")));
}
+ @Test
+ public void handleCleanupWhenInputSenderFails() throws Exception {
+ String descriptorId1 = "descriptor1";
+ Exception testException = new Exception();
+
+ InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
+ CloseableFnDataReceiver mockInputSender =
mock(CloseableFnDataReceiver.class);
+
+ ProcessBundleDescriptor descriptor =
+ ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build();
+ CompletableFuture<InstructionResponse> processBundleResponseFuture =
+ new CompletableFuture<>();
+ when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
+ .thenReturn(new CompletableFuture<>())
+ .thenReturn(processBundleResponseFuture);
+
+ FullWindowedValueCoder<String> coder =
+ FullWindowedValueCoder.of(StringUtf8Coder.of(), Coder.INSTANCE);
+ BundleProcessor<String> processor =
+ sdkHarnessClient.getProcessor(
+ descriptor, RemoteInputDestination.of(coder,
Target.getDefaultInstance()));
+ when(dataService.receive(any(), any(),
any())).thenReturn(mockOutputReceiver);
Review comment:
off-topic: I noticed that `FnDataService.receive()` still refers to the
"returned future" in the Javadoc even though it doesn't return a future anymore.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 85182)
Time Spent: 3h 50m (was: 3h 40m)
> Execute a Stage via the portability framework in the ReferenceRunner
> --------------------------------------------------------------------
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
> Issue Type: New Feature
> Components: runner-core
> Reporter: Thomas Groh
> Assignee: Thomas Groh
> Priority: Major
> Labels: portability
> Time Spent: 3h 50m
> Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)