Hi all,

I am writing to Elasticsearch using Beam (Google Dataflow). At the moment I
pass index name to ElasticsearchIO.Write() as variable when deploying the
dataflow. Instead of having one static index, I want to insert to multiple
Elasticsearch indices, I have the index name on Pub/Sub topic, so I want to
read the current row that is being written and set the Elasticsearch index
based on that value.

My current code is:
ConnectionConfiguration config =
   ConnectionConfiguration.create(
   options().getNodeAddresses().split(","),
   options().getIndex(),
   options().getDocumentType())
   .withUsername(options().getUsername())
   .withPassword(options().getPassword());

I need to change "options().getIndex()" line to get the index from Pub/Sub
topic instead. Pub/Sub topic field where index name is stored is: esIndex,
so I tried replacing that line with "doc -> doc.get("esIndex")" but I see
the next error: "java.lang.String is not a functional interface".

Does anyone know how to set the Elasticsearch index reading Pub/Sub topic
that is currently being written?

Best,
Arif

Reply via email to