It looks like you should be able to accomplish this with withIndexFn [1].
Does that work for you?

Brian

[1]
https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html#withIndexFn-org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write.FieldValueExtractFn-

On Wed, Jan 13, 2021 at 1:15 AM Arif Alili <[email protected]> wrote:

> Hi all,
>
> I am writing to Elasticsearch using Beam (Google Dataflow). At the moment
> I pass index name to ElasticsearchIO.Write() as variable when deploying the
> dataflow. Instead of having one static index, I want to insert to multiple
> Elasticsearch indices, I have the index name on Pub/Sub topic, so I want to
> read the current row that is being written and set the Elasticsearch index
> based on that value.
>
> My current code is:
> ConnectionConfiguration config =
>    ConnectionConfiguration.create(
>    options().getNodeAddresses().split(","),
>    options().getIndex(),
>    options().getDocumentType())
>    .withUsername(options().getUsername())
>    .withPassword(options().getPassword());
>
> I need to change "options().getIndex()" line to get the index from Pub/Sub
> topic instead. Pub/Sub topic field where index name is stored is: esIndex,
> so I tried replacing that line with "doc -> doc.get("esIndex")" but I see
> the next error: "java.lang.String is not a functional interface".
>
> Does anyone know how to set the Elasticsearch index reading Pub/Sub topic
> that is currently being written?
>
> Best,
> Arif
>
>

Reply via email to