[
https://issues.apache.org/jira/browse/BEAM-3201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264477#comment-16264477
]
Etienne Chauchot commented on BEAM-3201:
----------------------------------------
[~nerdynick] regarding what Chet said, and with the knowledge about your use
case that you gave, have you tried the {{Partition}} transform? It allows to
"split" a PCollection (your unbounded one got after reading from kafka) into
several based on a user defined function, allowing then to plug different
{{ESIO.Write()}} transforms to write to different index/type. The only thing is
that it requires to specify the number of partitions when apply {{Partition}}
transform.
If it does not fit, then I prefer having 3 user defined functions that specify
id, type and index values rather than adding metadata fields to the element
stored in the PCollection. Adding them will not break the backward
compatibility because {{with[id|type|index]Fn}} will be optional. In the end,
at writing time, of course, the payload of the ES doc will not contain the
metadata fields, they will be in the regular metadata bulk place
{code}
{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{code}
Besides keeping PCollection<String> is mainly to avoid breaking the API. You
said "Doing the work to deserialize, add fields, serialize is a lot of extra
work "; in the user functions you will not need to add fields because the
fields would not be in the output payload. You would just need to parse the
json string to determine the id, type, index names out of the json on each call
to {{processElement}} in the {{ESIO.WriteFn}}. To parse you can use JSonPath of
course.
> ElasticsearchIO should deal with documents id
> ---------------------------------------------
>
> Key: BEAM-3201
> URL: https://issues.apache.org/jira/browse/BEAM-3201
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-extensions
> Reporter: Etienne Chauchot
> Assignee: Chet Aldrich
>
> Today the ESIO only inserts the payload of the ES documents. Elasticsearch
> generates a document id for each record inserted. So each new insertion is
> considered as a new document. Users want to be able to update documents using
> the IO. So, for the write part of the IO, users should be able to provide a
> document id so that they could update already stored documents. Providing an
> id for the documents could also help the user on indempotency.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)