Hi Kobi,

Missed this earlier. Could you describe how it is limiting you?

The timestampFn depends on the type of key and value. Thats why once you
set it, we don't allow modifying keyCoder() or valueCoder() etc.

e.g. scenario we want to avoid (if we didn't have this restriction) :

KafakIO.read()
   .withKeyCoder(StringUtf8Coder.of())
   .withValueCoder(ByteArrayCoder.of())
   .withTimestampFn(fn_that_takes(KV<String, byte[]>)
   .withKeyCoder(BigEndianLongCoder.of()) // <====== oops this invalidates
timestampFn, but not by compiler.

There is a proposal to change KafkaIO API a bit that would remove this
restriction (WIP : https://github.com/apache/incubator-beam/pull/1048). The
proposed fix is to start with specify key and value type and not allow
changing the type. The above example instead would look like:

KafakIO*<String, byte[]>*.read()            // Type parameters are required
to start with.
   .withKeyCoder(StringUtf8Coder.of())
   .withValueCoder(ByteArrayCoder.of())
   .withTimestampFn(fn_that_takes(KV<String, byte[]>)
   .withKeyCoder(BigEndianLongCoder.of()) // <==== this would not compile



On Thu, Sep 8, 2016 at 12:39 AM, Kobi Salant <[email protected]> wrote:

> Hi,
>
> We are trying to use withTimestampFn, it returns a TypedRead object and
> there is no way to get back to Read class.
>
> It means that we have to call KafkaIO read methods in a specific order and
> call withTimestampFn at the end otherwise it breaks.
>
> @Raghu - is there any reason for it? Do you have any recommendation how to
> solve it?
>
> 10x
> Kobi
>

Reply via email to