@Chamikara: Yuhong and I are working on Samza Runner, and we are looking
for a way to swap the transform for ease of use in testing.

@Reuven: Your approach will work for this case, but we do have quite a few
read transforms here and we have to plug this code in each of time with
some testing logic there too. Seems not very clean to me to have testing
code mixed with real logic. It will be hard to maintain in the long run if
we add more read transforms in the future. It will be much nicer if we can
leverage something like TransformOverrides to replace a transform entirely
without messing around the existing code.

Thanks,
Xinyu

On Tue, Apr 20, 2021 at 10:00 PM Boyuan Zhang <boyu...@google.com> wrote:

> +1 to use pipeline options.
>
>  Alternatively, you can also change your KafkaReadTransform to perform
> different expansion(override expand()) based on your pipeline options.
>
> On Tue, Apr 20, 2021 at 9:51 PM Reuven Lax <re...@google.com> wrote:
>
>> It would be simpler to create a custom pipeline option, and swap out the
>> read transform in your code. For example
>>
>> PCollection<Type> pc;
>> if (options.getLocalTest()) {
>>   pc = pipeline.apply(new ReadFromLocalFile());
>> } else {
>>   pc = pipeline.apply(new KafkaReadTrasnform());
>> }
>>
>> pc.apply(/* rest of pipeline */);
>>
>> On Tue, Apr 20, 2021 at 9:41 PM Yuhong Cheng <mabelyuhong0...@gmail.com>
>> wrote:
>>
>>> We want to support transform override when doing tests locally.  For
>>> example, in real pipelines, we read from Kafka, but when doing tests
>>> locally, we want to read from a local file to help test whether the
>>> pipeline works fine. So we want to override the Kafka read transform
>>> directly instead of writing the pipeline twice.
>>>
>>> code example:
>>>
>>> public Pipeline createPipeline(Pipeline pipeline) {
>>>
>>>    pipeline.apply(new KafkaReadTransform()).apply(// other functions..);
>>> }
>>> In test, we will use the same createPipeline() function to create a
>>> pipeline but meanwhile we want to override KafkaReadTransform with another
>>> transform to avoid reading from Kafka.
>>>
>>> Thanks,
>>> Yuhong
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Apr 20, 2021 at 9:02 PM Chamikara Jayalath <chamik...@google.com>
>>> wrote:
>>>
>>>> In general, TransformOverrides are expected to be per-runner
>>>> implementation details and are not expected to be directly used by
>>>> end-users.
>>>> What is the exact use-case you are trying to achieve ? Are you running
>>>> into a missing feature of an existing transform ?
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>> On Tue, Apr 20, 2021 at 5:58 PM Yuhong Cheng <mabelyuhong0...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Beam,
>>>>> We have a use case when creating a pipeline, we want to replace the IO
>>>>> read/write transform when testing using `pipeline.replaceAll(overrides)`.
>>>>> However, we met some problems when doing tests:
>>>>> 1. Are there any ways we can avoid calling expand() of a transform
>>>>> when it is going to be replaced?  The reason we want to override a
>>>>> transform is because that the expand() of this transform is somehow not
>>>>> available in some situations. It seems not reasonable enough to call the
>>>>> expand() of the originalTransform and then call the expand() of the
>>>>> overrideTransform again?
>>>>> 2. When trying to implement `PTransformOverrideFactory`, we realize
>>>>> that the inputs are `TaggedPValue`, which can only make {Tuple,
>>>>> PCollection} pairs. Then if we want to override a write transform whose
>>>>> output type is `PDone`, what's the best way to implement this factory?
>>>>>
>>>>>
>>>>> Thanks in advance for answers! This is quite important to our
>>>>> pipelines.
>>>>>
>>>>> Thanks,
>>>>> Yuhong
>>>>>
>>>>

Reply via email to