lukecwik commented on a change in pull request #16051:
URL: https://github.com/apache/beam/pull/16051#discussion_r758826835
##########
File path:
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2.java
##########
@@ -104,71 +105,9 @@ public void awaitCompletion() throws Exception {
try {
while (true) {
BeamFnApi.Elements elements = queue.take();
- for (BeamFnApi.Elements.Data data : elements.getDataList()) {
- EndpointStatus<DataEndpoint<?>> endpoint =
- transformIdToDataEndpoint.get(data.getTransformId());
- if (endpoint == null) {
- throw new IllegalStateException(
- String.format(
- "Unable to find inbound data receiver for instruction %s
and transform %s.",
- data.getInstructionId(), data.getTransformId()));
- } else if (endpoint.isDone) {
- throw new IllegalStateException(
- String.format(
- "Received data after inbound data receiver is done for
instruction %s and transform %s.",
- data.getInstructionId(), data.getTransformId()));
- }
- InputStream inputStream = data.getData().newInput();
- Coder<Object> coder = (Coder<Object>) endpoint.endpoint.getCoder();
- FnDataReceiver<Object> receiver =
- (FnDataReceiver<Object>) endpoint.endpoint.getReceiver();
- while (inputStream.available() > 0) {
- receiver.accept(coder.decode(inputStream));
- }
- if (data.getIsLast()) {
- endpoint.isDone = true;
- numEndpointsThatAreIncomplete -= 1;
- if (numEndpointsThatAreIncomplete == 0) {
- return;
- }
- }
- }
-
- for (BeamFnApi.Elements.Timers timers : elements.getTimersList()) {
- Map<String, EndpointStatus<TimerEndpoint<?>>>
timerFamilyIdToEndpoints =
-
transformIdToTimerFamilyIdToTimerEndpoint.get(timers.getTransformId());
- if (timerFamilyIdToEndpoints == null) {
- throw new IllegalStateException(
- String.format(
- "Unable to find inbound timer receiver for instruction %s,
transform %s, and timer family %s.",
- timers.getInstructionId(), timers.getTransformId(),
timers.getTimerFamilyId()));
- }
- EndpointStatus<TimerEndpoint<?>> endpoint =
- timerFamilyIdToEndpoints.get(timers.getTimerFamilyId());
- if (endpoint == null) {
- throw new IllegalStateException(
- String.format(
- "Unable to find inbound timer receiver for instruction %s,
transform %s, and timer family %s.",
- timers.getInstructionId(), timers.getTransformId(),
timers.getTimerFamilyId()));
- } else if (endpoint.isDone) {
- throw new IllegalStateException(
- String.format(
- "Received timer after inbound timer receiver is done for
instruction %s, transform %s, and timer family %s.",
- timers.getInstructionId(), timers.getTransformId(),
timers.getTimerFamilyId()));
- }
- InputStream inputStream = timers.getTimers().newInput();
- Coder<Object> coder = (Coder<Object>) endpoint.endpoint.getCoder();
- FnDataReceiver<Object> receiver =
- (FnDataReceiver<Object>) endpoint.endpoint.getReceiver();
- while (inputStream.available() > 0) {
- receiver.accept(coder.decode(inputStream));
- }
- if (timers.getIsLast()) {
- numEndpointsThatAreIncomplete -= 1;
- if (numEndpointsThatAreIncomplete == 0) {
- return;
- }
- }
+ multiplexElements(elements);
+ if (numEndpointsThatAreIncomplete == 0) {
+ return;
}
Review comment:
```suggestion
if (multiplexElements(elements)) {
return;
}
```
##########
File path:
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -790,6 +811,367 @@ public void testPTransformStartExceptionsArePropagated() {
assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"),
empty());
}
+ private static final class SimpleRecordingDoFn extends DoFn<KV<String,
String>, String> {
+ private static final TupleTag<String> MAIN_OUTPUT_TAG = new
TupleTag<>("mainOutput");
+ private static final String TIMER_FAMILY_ID = "timer_family";
+
+ @TimerFamily(TIMER_FAMILY_ID)
+ private final TimerSpec timer = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+ static List<String> consumedData = new ArrayList<>();
+ static List<String> firedOnTimerCallbackTimerIds = new ArrayList<>();
+
+ @ProcessElement
+ public void processElement(ProcessContext context, BoundedWindow window) {}
+
+ @OnTimerFamily(TIMER_FAMILY_ID)
+ public void onTimer(@TimerId String timerId) {
+ firedOnTimerCallbackTimerIds.add(timerId);
+ }
+ }
+
+ private ProcessBundleHandler
setupProcessBundleHanlderForSimpleRecordingDoFn() throws Exception {
+ SimpleRecordingDoFn.consumedData.clear();
+ SimpleRecordingDoFn.firedOnTimerCallbackTimerIds.clear();
+ DoFnWithExecutionInformation doFnWithExecutionInformation =
+ DoFnWithExecutionInformation.of(
+ new SimpleRecordingDoFn(),
+ SimpleRecordingDoFn.MAIN_OUTPUT_TAG,
+ Collections.emptyMap(),
+ DoFnSchemaInformation.create());
+ RunnerApi.FunctionSpec functionSpec =
+ RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
+ .setPayload(
+ ByteString.copyFrom(
+
SerializableUtils.serializeToByteArray(doFnWithExecutionInformation)))
+ .build();
+ RunnerApi.ParDoPayload parDoPayload =
+ ParDoPayload.newBuilder()
+ .setDoFn(functionSpec)
+ .putTimerFamilySpecs(
+ "tfs-" + SimpleRecordingDoFn.TIMER_FAMILY_ID,
+ TimerFamilySpec.newBuilder()
+ .setTimeDomain(RunnerApi.TimeDomain.Enum.EVENT_TIME)
+ .setTimerFamilyCoderId("timer-coder")
+ .build())
+ .build();
+ BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+ ProcessBundleDescriptor.newBuilder()
+ .putTransforms(
+ "2L",
+ PTransform.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+ .putOutputs("2L-output", "2L-output-pc")
+ .build())
+ .putTransforms(
+ "3L",
+ PTransform.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+ .setPayload(parDoPayload.toByteString()))
+ .putInputs("3L-input", "2L-output-pc")
+ .build())
+ .putPcollections(
+ "2L-output-pc",
+ PCollection.newBuilder()
+ .setWindowingStrategyId("window-strategy")
+ .setCoderId("2L-output-coder")
+ .build())
+ .putWindowingStrategies(
+ "window-strategy",
+ WindowingStrategy.newBuilder()
+ .setWindowCoderId("window-strategy-coder")
+ .setWindowFn(
+
FunctionSpec.newBuilder().setUrn("beam:window_fn:global_windows:v1"))
+ .setOutputTime(OutputTime.Enum.END_OF_WINDOW)
+ .setAccumulationMode(AccumulationMode.Enum.ACCUMULATING)
+
.setTrigger(Trigger.newBuilder().setAlways(Always.getDefaultInstance()))
+ .setClosingBehavior(ClosingBehavior.Enum.EMIT_ALWAYS)
+ .setOnTimeBehavior(OnTimeBehavior.Enum.FIRE_ALWAYS)
+ .build())
+
.setTimerApiServiceDescriptor(ApiServiceDescriptor.newBuilder().setUrl("url").build())
+ .putCoders("string_coder",
CoderTranslation.toProto(StringUtf8Coder.of()).getCoder())
+ .putCoders(
+ "2L-output-coder",
+ Coder.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build())
+ .addComponentCoderIds("string_coder")
+ .addComponentCoderIds("string_coder")
+ .build())
+ .putCoders(
+ "window-strategy-coder",
+ Coder.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN)
+ .build())
+ .build())
+ .putCoders(
+ "timer-coder",
+ Coder.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.TIMER_CODER_URN))
+ .addComponentCoderIds("string_coder")
+ .addComponentCoderIds("window-strategy-coder")
+ .build())
+ .build();
+ Map<String, Message> fnApiRegistry = ImmutableMap.of("1L",
processBundleDescriptor);
+
+ Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap =
+ Maps.newHashMap(REGISTERED_RUNNER_FACTORIES);
+ urnToPTransformRunnerFactoryMap.put(
+ DATA_INPUT_URN,
+ (PTransformRunnerFactory<Object>)
+ (context) -> {
+ context.addIncomingDataEndpoint(
+ ApiServiceDescriptor.getDefaultInstance(),
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()),
+ (input) -> {
+ SimpleRecordingDoFn.consumedData.add(input.getValue());
+ });
+ return null;
+ });
+
+ Mockito.doAnswer(
+ (invocation) -> {
+ // A no op consumer for timers.
+ return new CloseableFnDataReceiver() {
+ @Override
+ public void accept(Object input) throws Exception {}
+
+ @Override
+ public void flush() throws Exception {}
+
+ @Override
+ public void close() throws Exception {}
+ };
+ })
+ .when(beamFnDataClient)
+ .send(any(), any(), any());
+
+ return new ProcessBundleHandler(
+ PipelineOptionsFactory.create(),
+ Collections.emptySet(),
+ fnApiRegistry::get,
+ beamFnDataClient,
+ null /* beamFnStateClient */,
+ null /* finalizeBundleHandler */,
+ new ShortIdMap(),
+ urnToPTransformRunnerFactoryMap,
+ new BundleProcessorCache());
+ }
+
+ @Test
+ public void testInstructionEmbeddedElementsAreProcessed() throws Exception {
+ ProcessBundleHandler handler =
setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+ ByteString.Output encodedData = ByteString.newOutput();
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("",
"data"), encodedData);
+ ByteString.Output encodedTimer = ByteString.newOutput();
+ Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
+ .encode(
+ Timer.of(
+ "",
+ "timer_id",
+ Collections.singletonList(GlobalWindow.INSTANCE),
+ Instant.ofEpochMilli(1L),
+ Instant.ofEpochMilli(1L),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING),
+ encodedTimer);
+
+ handler.processBundle(
+ InstructionRequest.newBuilder()
+ .setInstructionId("998L")
+ .setProcessBundle(
+ ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L")
+ .setElements(
+ Elements.newBuilder()
+ .addData(
+ Data.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("2L")
+ .setData(encodedData.toByteString())
+ .build())
+ .addData(
+ Data.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("2L")
+ .setIsLast(true)
+ .build())
+ .addTimers(
+ Timers.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("3L")
+ .setTimerFamilyId(
+ TimerFamilyDeclaration.PREFIX
+ +
SimpleRecordingDoFn.TIMER_FAMILY_ID)
+ .setTimers(encodedTimer.toByteString())
+ .build())
+ .addTimers(
+ Timers.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("3L")
+ .setTimerFamilyId(
+ TimerFamilyDeclaration.PREFIX
+ +
SimpleRecordingDoFn.TIMER_FAMILY_ID)
+ .setIsLast(true)
+ .build())
+ .build()))
+ .build());
+ handler.shutdown();
+ assertThat(SimpleRecordingDoFn.consumedData, contains("data"));
+ assertThat(SimpleRecordingDoFn.firedOnTimerCallbackTimerIds,
contains("timer_id"));
+ // Register timer family outbound receiver.
+ verify(beamFnDataClient).send(any(), any(), any());
+ verifyNoMoreInteractions(beamFnDataClient);
+ }
+
+ @Test
+ public void testInstructionEmbeddedElementsWithMalformedData() throws
Exception {
+ ProcessBundleHandler handler =
setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+ ByteString.Output encodedData = ByteString.newOutput();
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("",
"data"), encodedData);
+
+ assertThrows(
+ "Expect java.lang.IllegalStateException: Unable to find inbound data
receiver for"
+ + " instruction 998L and transform 3L. But was not thrown or
Exception did not match.",
+ IllegalStateException.class,
+ () ->
+ handler.processBundle(
+ InstructionRequest.newBuilder()
+ .setInstructionId("998L")
+ .setProcessBundle(
+ ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L")
+ .setElements(
+ Elements.newBuilder()
+ .addData(
+ Data.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("3L")
+
.setData(encodedData.toByteString())
+ .build())
+ .build()))
+ .build()));
+ assertThrows(
+ "Expect java.lang.RuntimeException: Elements embedded in
ProcessBundleRequest are "
+ + "incomplete. But was not thrown or Exception did not match.",
+ RuntimeException.class,
+ () ->
+ handler.processBundle(
+ InstructionRequest.newBuilder()
+ .setInstructionId("998L")
+ .setProcessBundle(
+ ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L")
+ .setElements(
+ Elements.newBuilder()
+ .addData(
+ Data.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("2L")
+
.setData(encodedData.toByteString())
+ .build())
+ .build()))
+ .build()));
+ handler.shutdown();
+ }
+
+ @Test
+ public void testInstructionEmbeddedElementsWithMalformedTimers() throws
Exception {
+ ProcessBundleHandler handler =
setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+ ByteString.Output encodedTimer = ByteString.newOutput();
+ Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
+ .encode(
+ Timer.of(
+ "",
+ "timer_id",
+ Collections.singletonList(GlobalWindow.INSTANCE),
+ Instant.ofEpochMilli(1L),
+ Instant.ofEpochMilli(1L),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING),
+ encodedTimer);
+
+ assertThrows(
+ "Expect java.lang.IllegalStateException: Unable to find inbound timer
receiver "
+ + "for instruction 998L, transform 4L, and timer family
tfs-timer_family. But was not"
+ + " thrown or Exception did not match.",
+ IllegalStateException.class,
+ () ->
+ handler.processBundle(
+ InstructionRequest.newBuilder()
+ .setInstructionId("998L")
+ .setProcessBundle(
+ ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L")
+ .setElements(
+ Elements.newBuilder()
+ .addTimers(
+ Timers.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("4L")
+ .setTimerFamilyId(
+ TimerFamilyDeclaration.PREFIX
+ +
SimpleRecordingDoFn.TIMER_FAMILY_ID)
+
.setTimers(encodedTimer.toByteString())
+ .build())
+ .build()))
+ .build()));
+ assertThrows(
+ "Expect java.lang.IllegalStateException: Unable to find inbound timer
receiver "
+ + "for instruction 998L, transform 3L, and timer family
tfs-not_declared_id. But was "
+ + "not thrown or Exception did not match.",
+ IllegalStateException.class,
+ () ->
+ handler.processBundle(
+ InstructionRequest.newBuilder()
+ .setInstructionId("998L")
+ .setProcessBundle(
+ ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L")
+ .setElements(
+ Elements.newBuilder()
+ .addTimers(
+ Timers.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("3L")
+ .setTimerFamilyId(
+ TimerFamilyDeclaration.PREFIX
+ "not_declared_id")
+
.setTimers(encodedTimer.toByteString())
+ .build())
+ .build()))
+ .build()));
+ assertThrows(
+ "Expect java.lang.RuntimeException: Elements embedded in
ProcessBundleRequest are"
Review comment:
"But was not thrown or Exception did not match" ???
##########
File path:
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -790,6 +811,367 @@ public void testPTransformStartExceptionsArePropagated() {
assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"),
empty());
}
+ private static final class SimpleRecordingDoFn extends DoFn<KV<String,
String>, String> {
+ private static final TupleTag<String> MAIN_OUTPUT_TAG = new
TupleTag<>("mainOutput");
+ private static final String TIMER_FAMILY_ID = "timer_family";
+
+ @TimerFamily(TIMER_FAMILY_ID)
+ private final TimerSpec timer = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+ static List<String> consumedData = new ArrayList<>();
+ static List<String> firedOnTimerCallbackTimerIds = new ArrayList<>();
+
+ @ProcessElement
+ public void processElement(ProcessContext context, BoundedWindow window) {}
+
+ @OnTimerFamily(TIMER_FAMILY_ID)
+ public void onTimer(@TimerId String timerId) {
+ firedOnTimerCallbackTimerIds.add(timerId);
+ }
+ }
+
+ private ProcessBundleHandler
setupProcessBundleHanlderForSimpleRecordingDoFn() throws Exception {
+ SimpleRecordingDoFn.consumedData.clear();
+ SimpleRecordingDoFn.firedOnTimerCallbackTimerIds.clear();
+ DoFnWithExecutionInformation doFnWithExecutionInformation =
+ DoFnWithExecutionInformation.of(
+ new SimpleRecordingDoFn(),
+ SimpleRecordingDoFn.MAIN_OUTPUT_TAG,
+ Collections.emptyMap(),
+ DoFnSchemaInformation.create());
+ RunnerApi.FunctionSpec functionSpec =
+ RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
+ .setPayload(
+ ByteString.copyFrom(
+
SerializableUtils.serializeToByteArray(doFnWithExecutionInformation)))
+ .build();
+ RunnerApi.ParDoPayload parDoPayload =
+ ParDoPayload.newBuilder()
+ .setDoFn(functionSpec)
+ .putTimerFamilySpecs(
+ "tfs-" + SimpleRecordingDoFn.TIMER_FAMILY_ID,
+ TimerFamilySpec.newBuilder()
+ .setTimeDomain(RunnerApi.TimeDomain.Enum.EVENT_TIME)
+ .setTimerFamilyCoderId("timer-coder")
+ .build())
+ .build();
+ BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+ ProcessBundleDescriptor.newBuilder()
+ .putTransforms(
+ "2L",
+ PTransform.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+ .putOutputs("2L-output", "2L-output-pc")
+ .build())
+ .putTransforms(
+ "3L",
+ PTransform.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+ .setPayload(parDoPayload.toByteString()))
+ .putInputs("3L-input", "2L-output-pc")
+ .build())
+ .putPcollections(
+ "2L-output-pc",
+ PCollection.newBuilder()
+ .setWindowingStrategyId("window-strategy")
+ .setCoderId("2L-output-coder")
+ .build())
+ .putWindowingStrategies(
+ "window-strategy",
+ WindowingStrategy.newBuilder()
+ .setWindowCoderId("window-strategy-coder")
+ .setWindowFn(
+
FunctionSpec.newBuilder().setUrn("beam:window_fn:global_windows:v1"))
+ .setOutputTime(OutputTime.Enum.END_OF_WINDOW)
+ .setAccumulationMode(AccumulationMode.Enum.ACCUMULATING)
+
.setTrigger(Trigger.newBuilder().setAlways(Always.getDefaultInstance()))
+ .setClosingBehavior(ClosingBehavior.Enum.EMIT_ALWAYS)
+ .setOnTimeBehavior(OnTimeBehavior.Enum.FIRE_ALWAYS)
+ .build())
+
.setTimerApiServiceDescriptor(ApiServiceDescriptor.newBuilder().setUrl("url").build())
+ .putCoders("string_coder",
CoderTranslation.toProto(StringUtf8Coder.of()).getCoder())
+ .putCoders(
+ "2L-output-coder",
+ Coder.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build())
+ .addComponentCoderIds("string_coder")
+ .addComponentCoderIds("string_coder")
+ .build())
+ .putCoders(
+ "window-strategy-coder",
+ Coder.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN)
+ .build())
+ .build())
+ .putCoders(
+ "timer-coder",
+ Coder.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.TIMER_CODER_URN))
+ .addComponentCoderIds("string_coder")
+ .addComponentCoderIds("window-strategy-coder")
+ .build())
+ .build();
+ Map<String, Message> fnApiRegistry = ImmutableMap.of("1L",
processBundleDescriptor);
+
+ Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap =
+ Maps.newHashMap(REGISTERED_RUNNER_FACTORIES);
+ urnToPTransformRunnerFactoryMap.put(
+ DATA_INPUT_URN,
+ (PTransformRunnerFactory<Object>)
+ (context) -> {
+ context.addIncomingDataEndpoint(
+ ApiServiceDescriptor.getDefaultInstance(),
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()),
+ (input) -> {
+ SimpleRecordingDoFn.consumedData.add(input.getValue());
+ });
+ return null;
+ });
+
+ Mockito.doAnswer(
+ (invocation) -> {
+ // A no op consumer for timers.
+ return new CloseableFnDataReceiver() {
+ @Override
+ public void accept(Object input) throws Exception {}
+
+ @Override
+ public void flush() throws Exception {}
+
+ @Override
+ public void close() throws Exception {}
+ };
+ })
+ .when(beamFnDataClient)
+ .send(any(), any(), any());
+
+ return new ProcessBundleHandler(
+ PipelineOptionsFactory.create(),
+ Collections.emptySet(),
+ fnApiRegistry::get,
+ beamFnDataClient,
+ null /* beamFnStateClient */,
+ null /* finalizeBundleHandler */,
+ new ShortIdMap(),
+ urnToPTransformRunnerFactoryMap,
+ new BundleProcessorCache());
+ }
+
+ @Test
+ public void testInstructionEmbeddedElementsAreProcessed() throws Exception {
+ ProcessBundleHandler handler =
setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+ ByteString.Output encodedData = ByteString.newOutput();
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("",
"data"), encodedData);
+ ByteString.Output encodedTimer = ByteString.newOutput();
+ Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
+ .encode(
+ Timer.of(
+ "",
+ "timer_id",
+ Collections.singletonList(GlobalWindow.INSTANCE),
+ Instant.ofEpochMilli(1L),
+ Instant.ofEpochMilli(1L),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING),
+ encodedTimer);
+
+ handler.processBundle(
+ InstructionRequest.newBuilder()
+ .setInstructionId("998L")
+ .setProcessBundle(
+ ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L")
+ .setElements(
+ Elements.newBuilder()
+ .addData(
+ Data.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("2L")
+ .setData(encodedData.toByteString())
+ .build())
+ .addData(
+ Data.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("2L")
+ .setIsLast(true)
+ .build())
+ .addTimers(
+ Timers.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("3L")
+ .setTimerFamilyId(
+ TimerFamilyDeclaration.PREFIX
+ +
SimpleRecordingDoFn.TIMER_FAMILY_ID)
+ .setTimers(encodedTimer.toByteString())
+ .build())
+ .addTimers(
+ Timers.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("3L")
+ .setTimerFamilyId(
+ TimerFamilyDeclaration.PREFIX
+ +
SimpleRecordingDoFn.TIMER_FAMILY_ID)
+ .setIsLast(true)
+ .build())
+ .build()))
+ .build());
+ handler.shutdown();
+ assertThat(SimpleRecordingDoFn.consumedData, contains("data"));
+ assertThat(SimpleRecordingDoFn.firedOnTimerCallbackTimerIds,
contains("timer_id"));
+ // Register timer family outbound receiver.
+ verify(beamFnDataClient).send(any(), any(), any());
+ verifyNoMoreInteractions(beamFnDataClient);
+ }
+
+ @Test
+ public void testInstructionEmbeddedElementsWithMalformedData() throws
Exception {
+ ProcessBundleHandler handler =
setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+ ByteString.Output encodedData = ByteString.newOutput();
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("",
"data"), encodedData);
+
+ assertThrows(
+ "Expect java.lang.IllegalStateException: Unable to find inbound data
receiver for"
+ + " instruction 998L and transform 3L. But was not thrown or
Exception did not match.",
+ IllegalStateException.class,
+ () ->
+ handler.processBundle(
+ InstructionRequest.newBuilder()
+ .setInstructionId("998L")
+ .setProcessBundle(
+ ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L")
+ .setElements(
+ Elements.newBuilder()
+ .addData(
+ Data.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("3L")
+
.setData(encodedData.toByteString())
+ .build())
+ .build()))
+ .build()));
+ assertThrows(
+ "Expect java.lang.RuntimeException: Elements embedded in
ProcessBundleRequest are "
+ + "incomplete. But was not thrown or Exception did not match.",
Review comment:
"But was not thrown or Exception did not match" ???
##########
File path:
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -790,6 +811,367 @@ public void testPTransformStartExceptionsArePropagated() {
assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"),
empty());
}
+ private static final class SimpleRecordingDoFn extends DoFn<KV<String,
String>, String> {
+ private static final TupleTag<String> MAIN_OUTPUT_TAG = new
TupleTag<>("mainOutput");
+ private static final String TIMER_FAMILY_ID = "timer_family";
+
+ @TimerFamily(TIMER_FAMILY_ID)
+ private final TimerSpec timer = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+ static List<String> consumedData = new ArrayList<>();
+ static List<String> firedOnTimerCallbackTimerIds = new ArrayList<>();
+
+ @ProcessElement
+ public void processElement(ProcessContext context, BoundedWindow window) {}
+
+ @OnTimerFamily(TIMER_FAMILY_ID)
+ public void onTimer(@TimerId String timerId) {
+ firedOnTimerCallbackTimerIds.add(timerId);
+ }
+ }
+
+ private ProcessBundleHandler
setupProcessBundleHanlderForSimpleRecordingDoFn() throws Exception {
+ SimpleRecordingDoFn.consumedData.clear();
+ SimpleRecordingDoFn.firedOnTimerCallbackTimerIds.clear();
+ DoFnWithExecutionInformation doFnWithExecutionInformation =
+ DoFnWithExecutionInformation.of(
+ new SimpleRecordingDoFn(),
+ SimpleRecordingDoFn.MAIN_OUTPUT_TAG,
+ Collections.emptyMap(),
+ DoFnSchemaInformation.create());
+ RunnerApi.FunctionSpec functionSpec =
+ RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
+ .setPayload(
+ ByteString.copyFrom(
+
SerializableUtils.serializeToByteArray(doFnWithExecutionInformation)))
+ .build();
+ RunnerApi.ParDoPayload parDoPayload =
+ ParDoPayload.newBuilder()
+ .setDoFn(functionSpec)
+ .putTimerFamilySpecs(
+ "tfs-" + SimpleRecordingDoFn.TIMER_FAMILY_ID,
+ TimerFamilySpec.newBuilder()
+ .setTimeDomain(RunnerApi.TimeDomain.Enum.EVENT_TIME)
+ .setTimerFamilyCoderId("timer-coder")
+ .build())
+ .build();
+ BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+ ProcessBundleDescriptor.newBuilder()
+ .putTransforms(
+ "2L",
+ PTransform.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+ .putOutputs("2L-output", "2L-output-pc")
+ .build())
+ .putTransforms(
+ "3L",
+ PTransform.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+ .setPayload(parDoPayload.toByteString()))
+ .putInputs("3L-input", "2L-output-pc")
+ .build())
+ .putPcollections(
+ "2L-output-pc",
+ PCollection.newBuilder()
+ .setWindowingStrategyId("window-strategy")
+ .setCoderId("2L-output-coder")
+ .build())
+ .putWindowingStrategies(
+ "window-strategy",
+ WindowingStrategy.newBuilder()
+ .setWindowCoderId("window-strategy-coder")
+ .setWindowFn(
+
FunctionSpec.newBuilder().setUrn("beam:window_fn:global_windows:v1"))
+ .setOutputTime(OutputTime.Enum.END_OF_WINDOW)
+ .setAccumulationMode(AccumulationMode.Enum.ACCUMULATING)
+
.setTrigger(Trigger.newBuilder().setAlways(Always.getDefaultInstance()))
+ .setClosingBehavior(ClosingBehavior.Enum.EMIT_ALWAYS)
+ .setOnTimeBehavior(OnTimeBehavior.Enum.FIRE_ALWAYS)
+ .build())
+
.setTimerApiServiceDescriptor(ApiServiceDescriptor.newBuilder().setUrl("url").build())
+ .putCoders("string_coder",
CoderTranslation.toProto(StringUtf8Coder.of()).getCoder())
+ .putCoders(
+ "2L-output-coder",
+ Coder.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build())
+ .addComponentCoderIds("string_coder")
+ .addComponentCoderIds("string_coder")
+ .build())
+ .putCoders(
+ "window-strategy-coder",
+ Coder.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN)
+ .build())
+ .build())
+ .putCoders(
+ "timer-coder",
+ Coder.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.TIMER_CODER_URN))
+ .addComponentCoderIds("string_coder")
+ .addComponentCoderIds("window-strategy-coder")
+ .build())
+ .build();
+ Map<String, Message> fnApiRegistry = ImmutableMap.of("1L",
processBundleDescriptor);
+
+ Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap =
+ Maps.newHashMap(REGISTERED_RUNNER_FACTORIES);
+ urnToPTransformRunnerFactoryMap.put(
+ DATA_INPUT_URN,
+ (PTransformRunnerFactory<Object>)
+ (context) -> {
+ context.addIncomingDataEndpoint(
+ ApiServiceDescriptor.getDefaultInstance(),
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()),
+ (input) -> {
+ SimpleRecordingDoFn.consumedData.add(input.getValue());
+ });
+ return null;
+ });
+
+ Mockito.doAnswer(
+ (invocation) -> {
+ // A no op consumer for timers.
+ return new CloseableFnDataReceiver() {
+ @Override
+ public void accept(Object input) throws Exception {}
+
+ @Override
+ public void flush() throws Exception {}
+
+ @Override
+ public void close() throws Exception {}
+ };
+ })
+ .when(beamFnDataClient)
+ .send(any(), any(), any());
+
+ return new ProcessBundleHandler(
+ PipelineOptionsFactory.create(),
+ Collections.emptySet(),
+ fnApiRegistry::get,
+ beamFnDataClient,
+ null /* beamFnStateClient */,
+ null /* finalizeBundleHandler */,
+ new ShortIdMap(),
+ urnToPTransformRunnerFactoryMap,
+ new BundleProcessorCache());
+ }
+
+ @Test
+ public void testInstructionEmbeddedElementsAreProcessed() throws Exception {
+ ProcessBundleHandler handler =
setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+ ByteString.Output encodedData = ByteString.newOutput();
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("",
"data"), encodedData);
+ ByteString.Output encodedTimer = ByteString.newOutput();
+ Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
+ .encode(
+ Timer.of(
+ "",
+ "timer_id",
+ Collections.singletonList(GlobalWindow.INSTANCE),
+ Instant.ofEpochMilli(1L),
+ Instant.ofEpochMilli(1L),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING),
+ encodedTimer);
+
+ handler.processBundle(
+ InstructionRequest.newBuilder()
+ .setInstructionId("998L")
+ .setProcessBundle(
+ ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L")
+ .setElements(
+ Elements.newBuilder()
+ .addData(
+ Data.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("2L")
+ .setData(encodedData.toByteString())
+ .build())
+ .addData(
+ Data.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("2L")
+ .setIsLast(true)
+ .build())
+ .addTimers(
+ Timers.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("3L")
+ .setTimerFamilyId(
+ TimerFamilyDeclaration.PREFIX
+ +
SimpleRecordingDoFn.TIMER_FAMILY_ID)
+ .setTimers(encodedTimer.toByteString())
+ .build())
+ .addTimers(
+ Timers.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("3L")
+ .setTimerFamilyId(
+ TimerFamilyDeclaration.PREFIX
+ +
SimpleRecordingDoFn.TIMER_FAMILY_ID)
+ .setIsLast(true)
+ .build())
+ .build()))
+ .build());
+ handler.shutdown();
+ assertThat(SimpleRecordingDoFn.consumedData, contains("data"));
+ assertThat(SimpleRecordingDoFn.firedOnTimerCallbackTimerIds,
contains("timer_id"));
+ // Register timer family outbound receiver.
+ verify(beamFnDataClient).send(any(), any(), any());
+ verifyNoMoreInteractions(beamFnDataClient);
+ }
+
+ @Test
+ public void testInstructionEmbeddedElementsWithMalformedData() throws
Exception {
+ ProcessBundleHandler handler =
setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+ ByteString.Output encodedData = ByteString.newOutput();
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("",
"data"), encodedData);
+
+ assertThrows(
+ "Expect java.lang.IllegalStateException: Unable to find inbound data
receiver for"
+ + " instruction 998L and transform 3L. But was not thrown or
Exception did not match.",
Review comment:
"But was not thrown or Exception did not match" ???
The failure cause makes it look like the wrong outcome was being met and not
the one that was added specifically for this case.
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -458,7 +458,16 @@ public BundleFinalizer getBundleFinalizer() {
startFunction.run();
}
- if
(!bundleProcessor.getInboundEndpointApiServiceDescriptors().isEmpty()) {
+ if (request.getProcessBundle().hasElements()) {
+ boolean inputFinished =
+ bundleProcessor
+ .getInboundObserver()
+
.multiplexElements(request.getProcessBundle().getElements());
+ if (!inputFinished) {
+ throw new RuntimeException(
+ "Elements embedded in ProcessBundleRequest are incomplete.");
Review comment:
nit: It would be nice to enumerate the missing ones in addition to the
messaging improvement.
```suggestion
"Elements embedded in ProcessBundleRequest do not contain
stream terminators for all data and timer inputs.");
```
##########
File path:
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -790,6 +811,367 @@ public void testPTransformStartExceptionsArePropagated() {
assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"),
empty());
}
+ private static final class SimpleRecordingDoFn extends DoFn<KV<String,
String>, String> {
+ private static final TupleTag<String> MAIN_OUTPUT_TAG = new
TupleTag<>("mainOutput");
+ private static final String TIMER_FAMILY_ID = "timer_family";
+
+ @TimerFamily(TIMER_FAMILY_ID)
+ private final TimerSpec timer = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+ static List<String> consumedData = new ArrayList<>();
+ static List<String> firedOnTimerCallbackTimerIds = new ArrayList<>();
+
+ @ProcessElement
+ public void processElement(ProcessContext context, BoundedWindow window) {}
+
+ @OnTimerFamily(TIMER_FAMILY_ID)
+ public void onTimer(@TimerId String timerId) {
+ firedOnTimerCallbackTimerIds.add(timerId);
+ }
+ }
+
+ private ProcessBundleHandler
setupProcessBundleHanlderForSimpleRecordingDoFn() throws Exception {
+ SimpleRecordingDoFn.consumedData.clear();
+ SimpleRecordingDoFn.firedOnTimerCallbackTimerIds.clear();
+ DoFnWithExecutionInformation doFnWithExecutionInformation =
+ DoFnWithExecutionInformation.of(
+ new SimpleRecordingDoFn(),
+ SimpleRecordingDoFn.MAIN_OUTPUT_TAG,
+ Collections.emptyMap(),
+ DoFnSchemaInformation.create());
+ RunnerApi.FunctionSpec functionSpec =
+ RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
+ .setPayload(
+ ByteString.copyFrom(
+
SerializableUtils.serializeToByteArray(doFnWithExecutionInformation)))
+ .build();
+ RunnerApi.ParDoPayload parDoPayload =
+ ParDoPayload.newBuilder()
+ .setDoFn(functionSpec)
+ .putTimerFamilySpecs(
+ "tfs-" + SimpleRecordingDoFn.TIMER_FAMILY_ID,
+ TimerFamilySpec.newBuilder()
+ .setTimeDomain(RunnerApi.TimeDomain.Enum.EVENT_TIME)
+ .setTimerFamilyCoderId("timer-coder")
+ .build())
+ .build();
+ BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+ ProcessBundleDescriptor.newBuilder()
+ .putTransforms(
+ "2L",
+ PTransform.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+ .putOutputs("2L-output", "2L-output-pc")
+ .build())
+ .putTransforms(
+ "3L",
+ PTransform.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+ .setPayload(parDoPayload.toByteString()))
+ .putInputs("3L-input", "2L-output-pc")
+ .build())
+ .putPcollections(
+ "2L-output-pc",
+ PCollection.newBuilder()
+ .setWindowingStrategyId("window-strategy")
+ .setCoderId("2L-output-coder")
+ .build())
+ .putWindowingStrategies(
+ "window-strategy",
+ WindowingStrategy.newBuilder()
+ .setWindowCoderId("window-strategy-coder")
+ .setWindowFn(
+
FunctionSpec.newBuilder().setUrn("beam:window_fn:global_windows:v1"))
+ .setOutputTime(OutputTime.Enum.END_OF_WINDOW)
+ .setAccumulationMode(AccumulationMode.Enum.ACCUMULATING)
+
.setTrigger(Trigger.newBuilder().setAlways(Always.getDefaultInstance()))
+ .setClosingBehavior(ClosingBehavior.Enum.EMIT_ALWAYS)
+ .setOnTimeBehavior(OnTimeBehavior.Enum.FIRE_ALWAYS)
+ .build())
+
.setTimerApiServiceDescriptor(ApiServiceDescriptor.newBuilder().setUrl("url").build())
+ .putCoders("string_coder",
CoderTranslation.toProto(StringUtf8Coder.of()).getCoder())
+ .putCoders(
+ "2L-output-coder",
+ Coder.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build())
+ .addComponentCoderIds("string_coder")
+ .addComponentCoderIds("string_coder")
+ .build())
+ .putCoders(
+ "window-strategy-coder",
+ Coder.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN)
+ .build())
+ .build())
+ .putCoders(
+ "timer-coder",
+ Coder.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.TIMER_CODER_URN))
+ .addComponentCoderIds("string_coder")
+ .addComponentCoderIds("window-strategy-coder")
+ .build())
+ .build();
+ Map<String, Message> fnApiRegistry = ImmutableMap.of("1L",
processBundleDescriptor);
+
+ Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap =
+ Maps.newHashMap(REGISTERED_RUNNER_FACTORIES);
+ urnToPTransformRunnerFactoryMap.put(
+ DATA_INPUT_URN,
+ (PTransformRunnerFactory<Object>)
+ (context) -> {
+ context.addIncomingDataEndpoint(
+ ApiServiceDescriptor.getDefaultInstance(),
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()),
+ (input) -> {
+ SimpleRecordingDoFn.consumedData.add(input.getValue());
+ });
+ return null;
+ });
+
+ Mockito.doAnswer(
+ (invocation) -> {
+ // A no op consumer for timers.
+ return new CloseableFnDataReceiver() {
+ @Override
+ public void accept(Object input) throws Exception {}
+
+ @Override
+ public void flush() throws Exception {}
+
+ @Override
+ public void close() throws Exception {}
+ };
+ })
+ .when(beamFnDataClient)
+ .send(any(), any(), any());
+
+ return new ProcessBundleHandler(
+ PipelineOptionsFactory.create(),
+ Collections.emptySet(),
+ fnApiRegistry::get,
+ beamFnDataClient,
+ null /* beamFnStateClient */,
+ null /* finalizeBundleHandler */,
+ new ShortIdMap(),
+ urnToPTransformRunnerFactoryMap,
+ new BundleProcessorCache());
+ }
+
+ @Test
+ public void testInstructionEmbeddedElementsAreProcessed() throws Exception {
+ ProcessBundleHandler handler =
setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+ ByteString.Output encodedData = ByteString.newOutput();
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("",
"data"), encodedData);
+ ByteString.Output encodedTimer = ByteString.newOutput();
+ Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
+ .encode(
+ Timer.of(
+ "",
+ "timer_id",
+ Collections.singletonList(GlobalWindow.INSTANCE),
+ Instant.ofEpochMilli(1L),
+ Instant.ofEpochMilli(1L),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING),
+ encodedTimer);
+
+ handler.processBundle(
+ InstructionRequest.newBuilder()
+ .setInstructionId("998L")
+ .setProcessBundle(
+ ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L")
+ .setElements(
+ Elements.newBuilder()
+ .addData(
+ Data.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("2L")
+ .setData(encodedData.toByteString())
+ .build())
+ .addData(
+ Data.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("2L")
+ .setIsLast(true)
+ .build())
+ .addTimers(
+ Timers.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("3L")
+ .setTimerFamilyId(
+ TimerFamilyDeclaration.PREFIX
+ +
SimpleRecordingDoFn.TIMER_FAMILY_ID)
+ .setTimers(encodedTimer.toByteString())
+ .build())
+ .addTimers(
+ Timers.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("3L")
+ .setTimerFamilyId(
+ TimerFamilyDeclaration.PREFIX
+ +
SimpleRecordingDoFn.TIMER_FAMILY_ID)
+ .setIsLast(true)
+ .build())
+ .build()))
+ .build());
+ handler.shutdown();
+ assertThat(SimpleRecordingDoFn.consumedData, contains("data"));
+ assertThat(SimpleRecordingDoFn.firedOnTimerCallbackTimerIds,
contains("timer_id"));
+ // Register timer family outbound receiver.
+ verify(beamFnDataClient).send(any(), any(), any());
+ verifyNoMoreInteractions(beamFnDataClient);
+ }
+
+ @Test
+ public void testInstructionEmbeddedElementsWithMalformedData() throws
Exception {
+ ProcessBundleHandler handler =
setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+ ByteString.Output encodedData = ByteString.newOutput();
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("",
"data"), encodedData);
+
+ assertThrows(
+ "Expect java.lang.IllegalStateException: Unable to find inbound data
receiver for"
+ + " instruction 998L and transform 3L. But was not thrown or
Exception did not match.",
+ IllegalStateException.class,
+ () ->
+ handler.processBundle(
+ InstructionRequest.newBuilder()
+ .setInstructionId("998L")
+ .setProcessBundle(
+ ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L")
+ .setElements(
+ Elements.newBuilder()
+ .addData(
+ Data.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("3L")
+
.setData(encodedData.toByteString())
+ .build())
+ .build()))
+ .build()));
+ assertThrows(
+ "Expect java.lang.RuntimeException: Elements embedded in
ProcessBundleRequest are "
+ + "incomplete. But was not thrown or Exception did not match.",
+ RuntimeException.class,
+ () ->
+ handler.processBundle(
+ InstructionRequest.newBuilder()
+ .setInstructionId("998L")
+ .setProcessBundle(
+ ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L")
+ .setElements(
+ Elements.newBuilder()
+ .addData(
+ Data.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("2L")
+
.setData(encodedData.toByteString())
+ .build())
+ .build()))
+ .build()));
+ handler.shutdown();
+ }
+
+ @Test
+ public void testInstructionEmbeddedElementsWithMalformedTimers() throws
Exception {
+ ProcessBundleHandler handler =
setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+ ByteString.Output encodedTimer = ByteString.newOutput();
+ Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
+ .encode(
+ Timer.of(
+ "",
+ "timer_id",
+ Collections.singletonList(GlobalWindow.INSTANCE),
+ Instant.ofEpochMilli(1L),
+ Instant.ofEpochMilli(1L),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING),
+ encodedTimer);
+
+ assertThrows(
+ "Expect java.lang.IllegalStateException: Unable to find inbound timer
receiver "
+ + "for instruction 998L, transform 4L, and timer family
tfs-timer_family. But was not"
+ + " thrown or Exception did not match.",
+ IllegalStateException.class,
+ () ->
+ handler.processBundle(
+ InstructionRequest.newBuilder()
+ .setInstructionId("998L")
+ .setProcessBundle(
+ ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L")
+ .setElements(
+ Elements.newBuilder()
+ .addTimers(
+ Timers.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("4L")
+ .setTimerFamilyId(
+ TimerFamilyDeclaration.PREFIX
+ +
SimpleRecordingDoFn.TIMER_FAMILY_ID)
+
.setTimers(encodedTimer.toByteString())
+ .build())
+ .build()))
+ .build()));
+ assertThrows(
+ "Expect java.lang.IllegalStateException: Unable to find inbound timer
receiver "
Review comment:
"But was not thrown or Exception did not match" ???
##########
File path:
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -790,6 +811,367 @@ public void testPTransformStartExceptionsArePropagated() {
assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"),
empty());
}
+ private static final class SimpleRecordingDoFn extends DoFn<KV<String,
String>, String> {
+ private static final TupleTag<String> MAIN_OUTPUT_TAG = new
TupleTag<>("mainOutput");
+ private static final String TIMER_FAMILY_ID = "timer_family";
+
+ @TimerFamily(TIMER_FAMILY_ID)
+ private final TimerSpec timer = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+ static List<String> consumedData = new ArrayList<>();
Review comment:
It would be best if you could forward the inputs as outputs and validate
your tests that way instead of relying on statics that can cause tests to enter
a bad state if things don't go as expected.
##########
File path:
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -790,6 +811,367 @@ public void testPTransformStartExceptionsArePropagated() {
assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"),
empty());
}
+ private static final class SimpleRecordingDoFn extends DoFn<KV<String,
String>, String> {
+ private static final TupleTag<String> MAIN_OUTPUT_TAG = new
TupleTag<>("mainOutput");
+ private static final String TIMER_FAMILY_ID = "timer_family";
+
+ @TimerFamily(TIMER_FAMILY_ID)
+ private final TimerSpec timer = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+ static List<String> consumedData = new ArrayList<>();
+ static List<String> firedOnTimerCallbackTimerIds = new ArrayList<>();
+
+ @ProcessElement
+ public void processElement(ProcessContext context, BoundedWindow window) {}
+
+ @OnTimerFamily(TIMER_FAMILY_ID)
+ public void onTimer(@TimerId String timerId) {
+ firedOnTimerCallbackTimerIds.add(timerId);
+ }
+ }
+
+ private ProcessBundleHandler
setupProcessBundleHanlderForSimpleRecordingDoFn() throws Exception {
+ SimpleRecordingDoFn.consumedData.clear();
+ SimpleRecordingDoFn.firedOnTimerCallbackTimerIds.clear();
+ DoFnWithExecutionInformation doFnWithExecutionInformation =
+ DoFnWithExecutionInformation.of(
+ new SimpleRecordingDoFn(),
+ SimpleRecordingDoFn.MAIN_OUTPUT_TAG,
+ Collections.emptyMap(),
+ DoFnSchemaInformation.create());
+ RunnerApi.FunctionSpec functionSpec =
+ RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
+ .setPayload(
+ ByteString.copyFrom(
+
SerializableUtils.serializeToByteArray(doFnWithExecutionInformation)))
+ .build();
+ RunnerApi.ParDoPayload parDoPayload =
+ ParDoPayload.newBuilder()
+ .setDoFn(functionSpec)
+ .putTimerFamilySpecs(
+ "tfs-" + SimpleRecordingDoFn.TIMER_FAMILY_ID,
+ TimerFamilySpec.newBuilder()
+ .setTimeDomain(RunnerApi.TimeDomain.Enum.EVENT_TIME)
+ .setTimerFamilyCoderId("timer-coder")
+ .build())
+ .build();
+ BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+ ProcessBundleDescriptor.newBuilder()
+ .putTransforms(
+ "2L",
+ PTransform.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+ .putOutputs("2L-output", "2L-output-pc")
+ .build())
+ .putTransforms(
+ "3L",
+ PTransform.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+ .setPayload(parDoPayload.toByteString()))
+ .putInputs("3L-input", "2L-output-pc")
+ .build())
+ .putPcollections(
+ "2L-output-pc",
+ PCollection.newBuilder()
+ .setWindowingStrategyId("window-strategy")
+ .setCoderId("2L-output-coder")
+ .build())
+ .putWindowingStrategies(
+ "window-strategy",
+ WindowingStrategy.newBuilder()
+ .setWindowCoderId("window-strategy-coder")
+ .setWindowFn(
+
FunctionSpec.newBuilder().setUrn("beam:window_fn:global_windows:v1"))
+ .setOutputTime(OutputTime.Enum.END_OF_WINDOW)
+ .setAccumulationMode(AccumulationMode.Enum.ACCUMULATING)
+
.setTrigger(Trigger.newBuilder().setAlways(Always.getDefaultInstance()))
+ .setClosingBehavior(ClosingBehavior.Enum.EMIT_ALWAYS)
+ .setOnTimeBehavior(OnTimeBehavior.Enum.FIRE_ALWAYS)
+ .build())
+
.setTimerApiServiceDescriptor(ApiServiceDescriptor.newBuilder().setUrl("url").build())
+ .putCoders("string_coder",
CoderTranslation.toProto(StringUtf8Coder.of()).getCoder())
+ .putCoders(
+ "2L-output-coder",
+ Coder.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build())
+ .addComponentCoderIds("string_coder")
+ .addComponentCoderIds("string_coder")
+ .build())
+ .putCoders(
+ "window-strategy-coder",
+ Coder.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN)
+ .build())
+ .build())
+ .putCoders(
+ "timer-coder",
+ Coder.newBuilder()
+
.setSpec(FunctionSpec.newBuilder().setUrn(ModelCoders.TIMER_CODER_URN))
+ .addComponentCoderIds("string_coder")
+ .addComponentCoderIds("window-strategy-coder")
+ .build())
+ .build();
+ Map<String, Message> fnApiRegistry = ImmutableMap.of("1L",
processBundleDescriptor);
+
+ Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap =
+ Maps.newHashMap(REGISTERED_RUNNER_FACTORIES);
+ urnToPTransformRunnerFactoryMap.put(
+ DATA_INPUT_URN,
+ (PTransformRunnerFactory<Object>)
+ (context) -> {
+ context.addIncomingDataEndpoint(
+ ApiServiceDescriptor.getDefaultInstance(),
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()),
+ (input) -> {
+ SimpleRecordingDoFn.consumedData.add(input.getValue());
+ });
+ return null;
+ });
+
+ Mockito.doAnswer(
+ (invocation) -> {
+ // A no op consumer for timers.
+ return new CloseableFnDataReceiver() {
+ @Override
+ public void accept(Object input) throws Exception {}
+
+ @Override
+ public void flush() throws Exception {}
+
+ @Override
+ public void close() throws Exception {}
+ };
+ })
+ .when(beamFnDataClient)
+ .send(any(), any(), any());
+
+ return new ProcessBundleHandler(
+ PipelineOptionsFactory.create(),
+ Collections.emptySet(),
+ fnApiRegistry::get,
+ beamFnDataClient,
+ null /* beamFnStateClient */,
+ null /* finalizeBundleHandler */,
+ new ShortIdMap(),
+ urnToPTransformRunnerFactoryMap,
+ new BundleProcessorCache());
+ }
+
+ @Test
+ public void testInstructionEmbeddedElementsAreProcessed() throws Exception {
+ ProcessBundleHandler handler =
setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+ ByteString.Output encodedData = ByteString.newOutput();
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("",
"data"), encodedData);
+ ByteString.Output encodedTimer = ByteString.newOutput();
+ Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
+ .encode(
+ Timer.of(
+ "",
+ "timer_id",
+ Collections.singletonList(GlobalWindow.INSTANCE),
+ Instant.ofEpochMilli(1L),
+ Instant.ofEpochMilli(1L),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING),
+ encodedTimer);
+
+ handler.processBundle(
+ InstructionRequest.newBuilder()
+ .setInstructionId("998L")
+ .setProcessBundle(
+ ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L")
+ .setElements(
+ Elements.newBuilder()
+ .addData(
+ Data.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("2L")
+ .setData(encodedData.toByteString())
+ .build())
+ .addData(
+ Data.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("2L")
+ .setIsLast(true)
+ .build())
+ .addTimers(
+ Timers.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("3L")
+ .setTimerFamilyId(
+ TimerFamilyDeclaration.PREFIX
+ +
SimpleRecordingDoFn.TIMER_FAMILY_ID)
+ .setTimers(encodedTimer.toByteString())
+ .build())
+ .addTimers(
+ Timers.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("3L")
+ .setTimerFamilyId(
+ TimerFamilyDeclaration.PREFIX
+ +
SimpleRecordingDoFn.TIMER_FAMILY_ID)
+ .setIsLast(true)
+ .build())
+ .build()))
+ .build());
+ handler.shutdown();
+ assertThat(SimpleRecordingDoFn.consumedData, contains("data"));
+ assertThat(SimpleRecordingDoFn.firedOnTimerCallbackTimerIds,
contains("timer_id"));
+ // Register timer family outbound receiver.
+ verify(beamFnDataClient).send(any(), any(), any());
+ verifyNoMoreInteractions(beamFnDataClient);
+ }
+
+ @Test
+ public void testInstructionEmbeddedElementsWithMalformedData() throws
Exception {
+ ProcessBundleHandler handler =
setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+ ByteString.Output encodedData = ByteString.newOutput();
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("",
"data"), encodedData);
+
+ assertThrows(
+ "Expect java.lang.IllegalStateException: Unable to find inbound data
receiver for"
+ + " instruction 998L and transform 3L. But was not thrown or
Exception did not match.",
+ IllegalStateException.class,
+ () ->
+ handler.processBundle(
+ InstructionRequest.newBuilder()
+ .setInstructionId("998L")
+ .setProcessBundle(
+ ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L")
+ .setElements(
+ Elements.newBuilder()
+ .addData(
+ Data.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("3L")
+
.setData(encodedData.toByteString())
+ .build())
+ .build()))
+ .build()));
+ assertThrows(
+ "Expect java.lang.RuntimeException: Elements embedded in
ProcessBundleRequest are "
+ + "incomplete. But was not thrown or Exception did not match.",
+ RuntimeException.class,
+ () ->
+ handler.processBundle(
+ InstructionRequest.newBuilder()
+ .setInstructionId("998L")
+ .setProcessBundle(
+ ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L")
+ .setElements(
+ Elements.newBuilder()
+ .addData(
+ Data.newBuilder()
+ .setInstructionId("998L")
+ .setTransformId("2L")
+
.setData(encodedData.toByteString())
+ .build())
+ .build()))
+ .build()));
+ handler.shutdown();
+ }
+
+ @Test
+ public void testInstructionEmbeddedElementsWithMalformedTimers() throws
Exception {
+ ProcessBundleHandler handler =
setupProcessBundleHanlderForSimpleRecordingDoFn();
+
+ ByteString.Output encodedTimer = ByteString.newOutput();
+ Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
+ .encode(
+ Timer.of(
+ "",
+ "timer_id",
+ Collections.singletonList(GlobalWindow.INSTANCE),
+ Instant.ofEpochMilli(1L),
+ Instant.ofEpochMilli(1L),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING),
+ encodedTimer);
+
+ assertThrows(
+ "Expect java.lang.IllegalStateException: Unable to find inbound timer
receiver "
+ + "for instruction 998L, transform 4L, and timer family
tfs-timer_family. But was not"
Review comment:
"But was not thrown or Exception did not match" ???
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]