Hi all,
I am writing to Elasticsearch using Beam (Google Dataflow). At the moment I
pass index name to ElasticsearchIO.Write() as variable when deploying the
dataflow. Instead of having one static index, I want to insert to multiple
Elasticsearch indices, I have the index name on Pub/Sub topic, so I want to
read the current row that is being written and set the Elasticsearch index
based on that value.
My current code is:
ConnectionConfiguration config =
ConnectionConfiguration.create(
options().getNodeAddresses().split(","),
options().getIndex(),
options().getDocumentType())
.withUsername(options().getUsername())
.withPassword(options().getPassword());
I need to change "options().getIndex()" line to get the index from Pub/Sub
topic instead. Pub/Sub topic field where index name is stored is: esIndex,
so I tried replacing that line with "doc -> doc.get("esIndex")" but I see
the next error: "java.lang.String is not a functional interface".
Does anyone know how to set the Elasticsearch index reading Pub/Sub topic
that is currently being written?
Best,
Arif