It looks like you should be able to accomplish this with withIndexFn [1]. Does that work for you?
Brian [1] https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html#withIndexFn-org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write.FieldValueExtractFn- On Wed, Jan 13, 2021 at 1:15 AM Arif Alili <[email protected]> wrote: > Hi all, > > I am writing to Elasticsearch using Beam (Google Dataflow). At the moment > I pass index name to ElasticsearchIO.Write() as variable when deploying the > dataflow. Instead of having one static index, I want to insert to multiple > Elasticsearch indices, I have the index name on Pub/Sub topic, so I want to > read the current row that is being written and set the Elasticsearch index > based on that value. > > My current code is: > ConnectionConfiguration config = > ConnectionConfiguration.create( > options().getNodeAddresses().split(","), > options().getIndex(), > options().getDocumentType()) > .withUsername(options().getUsername()) > .withPassword(options().getPassword()); > > I need to change "options().getIndex()" line to get the index from Pub/Sub > topic instead. Pub/Sub topic field where index name is stored is: esIndex, > so I tried replacing that line with "doc -> doc.get("esIndex")" but I see > the next error: "java.lang.String is not a functional interface". > > Does anyone know how to set the Elasticsearch index reading Pub/Sub topic > that is currently being written? > > Best, > Arif > >
