Hi all, Wondering if somebody could help and shed some lights on the behavior of Pipeline.replaceAll, particularly the outputs to expect after the replacement. I’m currently looking into supporting VR tests for SparkRunner in streaming mode [1]. Unfortunately, I didn’t succeed replacing (wrapping) the unbounded Create.Values used as test input into an unbounded source in a way that the node’s output would be UNBOUNDED. After the replacement the output is actually still the original one. Is this expected? What would be the recommended way to achieve this?
Below some code to explain further [2]. Also, related, [3] is a tiny PR to fix a broken assertion in PipelineTest.testReplaceAll(). Thanks, Moritz pipeline.apply("boundedToUnbounded", Create.of(0L)); pipeline.replaceAll( ImmutableList.of( PTransformOverride.of( application -> application.getTransform() instanceof Create.Values, // Replacement is GenerateSequence.from(Iterables.getOnlyElement(elements))) new ValuesToUnboundedSequenceOverride()))); pipeline.traverseTopologically( new PipelineVisitor.Defaults() { @Override public CompositeBehavior enterCompositeTransform(Node node) { if (node.getFullName().equals("boundedToUnbounded")) { assertThat(node.getTransform(), Matchers.instanceOf(GenerateSequence.class)); // FIXME Node still contains the original BOUNDED output. But why? PCollection<?> output = Iterables.getOnlyElement(node.getOutputs().values()); assertThat(output.isBounded(), Matchers.equalTo(PCollection.IsBounded.UNBOUNDED)); } return CompositeBehavior.ENTER_TRANSFORM; } }); [1] https://github.com/apache/beam/pull/22473 [2] https://github.com/apache/beam/compare/master...mosche:beam:BoundedToUnboundedReplaceAll [3] https://github.com/apache/beam/pull/22485 As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/>