+1

What is not supported yet is the wildcard indexes (index* pattern). It will be when the refactoring (use high level ES objects rather than low level REST String objects) is done.

Best

Etienne

On 13/01/2021 16:45, Brian Hulette wrote:
It looks like you should be able to accomplish this with withIndexFn [1]. Does that work for you?

Brian

[1] https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html#withIndexFn-org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write.FieldValueExtractFn-

On Wed, Jan 13, 2021 at 1:15 AM Arif Alili <[email protected] <mailto:[email protected]>> wrote:

    Hi all,

    I am writing to Elasticsearch using Beam (Google Dataflow). At the
    moment I pass index name to ElasticsearchIO.Write() as variable
    when deploying the dataflow. Instead of having one static index, I
    want to insert to multiple Elasticsearch indices, I have the index
    name on Pub/Sub topic, so I want to read the current row that is
    being written and set the Elasticsearch index based on that value.

    My current code is:
    ConnectionConfiguration config =
    ConnectionConfiguration.create(
    options().getNodeAddresses().split(","),
    options().getIndex(),
    options().getDocumentType())
       .withUsername(options().getUsername())
       .withPassword(options().getPassword());

    I need to change "options().getIndex()" line to get the index from
    Pub/Sub topic instead. Pub/Sub topic field where index name is
    stored is: esIndex, so I tried replacing that line with "doc ->
    doc.get("esIndex")" but I see the next error: "java.lang.String is
    not a functional interface".

    Does anyone know how to set the Elasticsearch index reading
    Pub/Sub topic that is currently being written?

    Best,
    Arif

Reply via email to