Hi all,

Wondering if somebody could help and shed some lights on the behavior of 
Pipeline.replaceAll, particularly the outputs to expect after the replacement.
I’m currently looking into supporting VR tests for SparkRunner in streaming 
mode [1]. Unfortunately, I didn’t succeed replacing (wrapping) the unbounded 
Create.Values used as test input into an unbounded source in a way that the 
node’s output would be UNBOUNDED. After the replacement the output is actually 
still the original one.
Is this expected? What would be the recommended way to achieve this?

Below some code to explain further [2].
Also, related, [3] is a tiny PR to fix a broken assertion in 
PipelineTest.testReplaceAll().

Thanks,
Moritz

    pipeline.apply("boundedToUnbounded", Create.of(0L));

    pipeline.replaceAll(
        ImmutableList.of(
            PTransformOverride.of(
                application -> application.getTransform() instanceof 
Create.Values,
                // Replacement is 
GenerateSequence.from(Iterables.getOnlyElement(elements)))
                new ValuesToUnboundedSequenceOverride())));

    pipeline.traverseTopologically(
        new PipelineVisitor.Defaults() {
          @Override
          public CompositeBehavior enterCompositeTransform(Node node) {
            if (node.getFullName().equals("boundedToUnbounded")) {
              assertThat(node.getTransform(), 
Matchers.instanceOf(GenerateSequence.class));

              // FIXME Node still contains the original BOUNDED output. But why?
              PCollection<?> output = 
Iterables.getOnlyElement(node.getOutputs().values());
              assertThat(output.isBounded(), 
Matchers.equalTo(PCollection.IsBounded.UNBOUNDED));
            }
            return CompositeBehavior.ENTER_TRANSFORM;
          }
        });


[1] https://github.com/apache/beam/pull/22473
[2] 
https://github.com/apache/beam/compare/master...mosche:beam:BoundedToUnboundedReplaceAll
[3] https://github.com/apache/beam/pull/22485


As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. <https://www.talend.com/privacy/>


Reply via email to