Yes, our recommendation to the users is the same as you described, passing
in the inputs as parameters. This fits many use cases. We also have users
wrapping IOs in their own composite transforms and/or having inputs
scattered in different modules/libraries. Passing in inputs doesn't work
well in these use cases, so we are thinking about whether we can provide a
way to override the input transform. From the discussion above, it seems
the current TransformOverride is not intended for this usage. I will take a
closer look at the code to see whether this is achievable (and see whether
we need more dependency from the core). Seems in portable pipeline Fuse can
manipulate the structure of the pipeline, but I am not sure about the java
pipeline.

Thanks,
Xinyu

On Wed, Apr 21, 2021 at 9:40 AM Robert Burke <rob...@frantil.com> wrote:

> My general answer for this is to avoid bundling the IOs with the rest of
> the pipeline. Have the Input collection be a parameter to a function that
> constructs the rest of the pipeline, which returns its intended
> PCollections as outputs.
>
> No need to go as far as wrap the whole construction function as a
> Composite, but that's similar.
>
> Runners providing features to make it easier to test the way you describe,
> though does sound very useful, but it does require the runner be aware of
> each transform to be overridden, possibly increasing the runners dependency
> surface.
>
> On Wed, Apr 21, 2021, 9:31 AM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>
>> @Chamikara: Yuhong and I are working on Samza Runner, and we are looking
>> for a way to swap the transform for ease of use in testing.
>>
>> @Reuven: Your approach will work for this case, but we do have quite a
>> few read transforms here and we have to plug this code in each of time with
>> some testing logic there too. Seems not very clean to me to have testing
>> code mixed with real logic. It will be hard to maintain in the long run if
>> we add more read transforms in the future. It will be much nicer if we can
>> leverage something like TransformOverrides to replace a transform entirely
>> without messing around the existing code.
>>
>> Thanks,
>> Xinyu
>>
>> On Tue, Apr 20, 2021 at 10:00 PM Boyuan Zhang <boyu...@google.com> wrote:
>>
>>> +1 to use pipeline options.
>>>
>>>  Alternatively, you can also change your KafkaReadTransform to perform
>>> different expansion(override expand()) based on your pipeline options.
>>>
>>> On Tue, Apr 20, 2021 at 9:51 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> It would be simpler to create a custom pipeline option, and swap out
>>>> the read transform in your code. For example
>>>>
>>>> PCollection<Type> pc;
>>>> if (options.getLocalTest()) {
>>>>   pc = pipeline.apply(new ReadFromLocalFile());
>>>> } else {
>>>>   pc = pipeline.apply(new KafkaReadTrasnform());
>>>> }
>>>>
>>>> pc.apply(/* rest of pipeline */);
>>>>
>>>> On Tue, Apr 20, 2021 at 9:41 PM Yuhong Cheng <mabelyuhong0...@gmail.com>
>>>> wrote:
>>>>
>>>>> We want to support transform override when doing tests locally.  For
>>>>> example, in real pipelines, we read from Kafka, but when doing tests
>>>>> locally, we want to read from a local file to help test whether the
>>>>> pipeline works fine. So we want to override the Kafka read transform
>>>>> directly instead of writing the pipeline twice.
>>>>>
>>>>> code example:
>>>>>
>>>>> public Pipeline createPipeline(Pipeline pipeline) {
>>>>>
>>>>>    pipeline.apply(new KafkaReadTransform()).apply(// other
>>>>> functions..);
>>>>> }
>>>>> In test, we will use the same createPipeline() function to create a
>>>>> pipeline but meanwhile we want to override KafkaReadTransform with another
>>>>> transform to avoid reading from Kafka.
>>>>>
>>>>> Thanks,
>>>>> Yuhong
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Apr 20, 2021 at 9:02 PM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>> In general, TransformOverrides are expected to be per-runner
>>>>>> implementation details and are not expected to be directly used by
>>>>>> end-users.
>>>>>> What is the exact use-case you are trying to achieve ? Are you
>>>>>> running into a missing feature of an existing transform ?
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>> On Tue, Apr 20, 2021 at 5:58 PM Yuhong Cheng <
>>>>>> mabelyuhong0...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Beam,
>>>>>>> We have a use case when creating a pipeline, we want to replace the
>>>>>>> IO read/write transform when testing using 
>>>>>>> `pipeline.replaceAll(overrides)`.
>>>>>>> However, we met some problems when doing tests:
>>>>>>> 1. Are there any ways we can avoid calling expand() of a transform
>>>>>>> when it is going to be replaced?  The reason we want to override a
>>>>>>> transform is because that the expand() of this transform is somehow not
>>>>>>> available in some situations. It seems not reasonable enough to call the
>>>>>>> expand() of the originalTransform and then call the expand() of the
>>>>>>> overrideTransform again?
>>>>>>> 2. When trying to implement `PTransformOverrideFactory`, we realize
>>>>>>> that the inputs are `TaggedPValue`, which can only make {Tuple,
>>>>>>> PCollection} pairs. Then if we want to override a write transform whose
>>>>>>> output type is `PDone`, what's the best way to implement this factory?
>>>>>>>
>>>>>>>
>>>>>>> Thanks in advance for answers! This is quite important to our
>>>>>>> pipelines.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Yuhong
>>>>>>>
>>>>>>

Reply via email to