Hello,
I am seeing below error when I try to use ElasticsearchSink. It complains about
serialization and looks like it is leading to "IndexRequestBuilder"
implementation. I have tried the suggestion as mentioned in
http://stackoverflow.com/questions/33246864/elasticsearch-sink-seralizability
(changed from anonymous class to concrete class) but it did not help. However,
when I call "ElasticsearchSink<>(config, transports, null)" by passing "null"
for "IndexRequestBuilder" then I don't see the serialization error. This
suggests the problem could be with the IndexRequestBuilder implementation but I
am not able to move further.
Could someone please let me know what's the right way to use
ElasticsearchSink() API?
Build DetailsFlink 1.2.0Elastic Search 5.3.0
Error Message
org.apache.flink.api.common.InvalidProgramException: The implementation of the
RichSinkFunction is not serializable. The object probably contains or
references non serializable fields. at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1539)
at
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:161)
at
org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1076)
Code Snippet
``` private ElasticsearchSink<Result> sinkToElasticSearch(AppConfiguration
appConfiguration) throws Exception {
String host = appConfiguration.getPipeline().getElasticSearch().getHost(); int
port = appConfiguration.getPipeline().getElasticSearch().getPort(); String
cluster = appConfiguration.getPipeline().getElasticSearch().getCluster();
Map<String, String> config = new HashMap<>();
config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", cluster);
List<TransportAddress> transports = new ArrayList<>(); transports.add(new
InetSocketTransportAddress(host, port));
return new ElasticsearchSink<>(config, transports, new
ResultIndexRequestBuilder(appConfiguration)); }
public class ResultIndexRequestBuilder implements IndexRequestBuilder<Result>,
Serializable {
private String index; private String type; //private transient Gson gson = new
Gson();
public ResultIndexRequestBuilder() {}
public ResultIndexRequestBuilder(AppConfiguration appConfiguration) { index =
appConfiguration.getPipeline().getElasticSearch().getIndex(); type =
appConfiguration.getPipeline().getElasticSearch().getType(); }
@Override public IndexRequest createIndexRequest(Result result, RuntimeContext
ctx) { Gson gson = new Gson(); String resultAsJson = gson.toJson(result);
System.out.println(resultAsJson); Map<String, String> jsonMap = new
HashMap<>(); jsonMap.put("data", resultAsJson);
return Requests.indexRequest() .index(index) .type(type) .source(jsonMap); }```
RegardsVijay