Hi Kobi, Missed this earlier. Could you describe how it is limiting you?
The timestampFn depends on the type of key and value. Thats why once you set it, we don't allow modifying keyCoder() or valueCoder() etc. e.g. scenario we want to avoid (if we didn't have this restriction) : KafakIO.read() .withKeyCoder(StringUtf8Coder.of()) .withValueCoder(ByteArrayCoder.of()) .withTimestampFn(fn_that_takes(KV<String, byte[]>) .withKeyCoder(BigEndianLongCoder.of()) // <====== oops this invalidates timestampFn, but not by compiler. There is a proposal to change KafkaIO API a bit that would remove this restriction (WIP : https://github.com/apache/incubator-beam/pull/1048). The proposed fix is to start with specify key and value type and not allow changing the type. The above example instead would look like: KafakIO*<String, byte[]>*.read() // Type parameters are required to start with. .withKeyCoder(StringUtf8Coder.of()) .withValueCoder(ByteArrayCoder.of()) .withTimestampFn(fn_that_takes(KV<String, byte[]>) .withKeyCoder(BigEndianLongCoder.of()) // <==== this would not compile On Thu, Sep 8, 2016 at 12:39 AM, Kobi Salant <[email protected]> wrote: > Hi, > > We are trying to use withTimestampFn, it returns a TypedRead object and > there is no way to get back to Read class. > > It means that we have to call KafkaIO read methods in a specific order and > call withTimestampFn at the end otherwise it breaks. > > @Raghu - is there any reason for it? Do you have any recommendation how to > solve it? > > 10x > Kobi >
