[ 
https://issues.apache.org/jira/browse/BEAM-3201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264477#comment-16264477
 ] 

Etienne Chauchot commented on BEAM-3201:
----------------------------------------

[~nerdynick] regarding what Chet said, and with the knowledge about your use 
case that you gave, have you tried the {{Partition}} transform?  It allows to 
"split" a PCollection (your unbounded one got after reading from kafka) into 
several based on a user defined function, allowing then to plug different 
{{ESIO.Write()}} transforms to write to different index/type. The only thing is 
that it requires to specify the number of partitions when apply {{Partition}} 
transform. 

If it does not fit, then I prefer having 3 user defined functions that specify 
id, type and index values rather than adding metadata fields to the element 
stored in the PCollection. Adding them will not break the backward 
compatibility because {{with[id|type|index]Fn}} will be optional. In the end, 
at writing time, of course, the payload of the ES doc will not contain the 
metadata fields, they will be in the regular metadata bulk place 
{code}
{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{code}

Besides keeping PCollection<String> is mainly to avoid breaking the API. You 
said "Doing the work to deserialize, add fields, serialize is a lot of extra 
work "; in the user functions you will not need to add fields because the 
fields would not be in the output payload. You would just need to parse the 
json string to determine the id, type, index names out of the json on each call 
to {{processElement}} in the {{ESIO.WriteFn}}. To parse you can use JSonPath of 
course.

> ElasticsearchIO should deal with documents id
> ---------------------------------------------
>
>                 Key: BEAM-3201
>                 URL: https://issues.apache.org/jira/browse/BEAM-3201
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-extensions
>            Reporter: Etienne Chauchot
>            Assignee: Chet Aldrich
>
> Today the ESIO only inserts the payload of the ES documents. Elasticsearch 
> generates a document id for each record inserted. So each new insertion is 
> considered as a new document. Users want to be able to update documents using 
> the IO. So, for the write part of the IO, users should be able to provide a 
> document id so that they could update already stored documents. Providing an 
> id for the documents could also help the user on indempotency.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to