Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/1962#discussion_r62173042
--- Diff:
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
---
@@ -177,7 +180,72 @@ public void open(Configuration configuration) {
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch TransportClient {}",
client);
}
+ }
+
+ @Override
+ public void invoke(final T element) {
+ ParameterTool params = ParameterTool.fromMap(userConfig);
+
+ if (params.has(CONFIG_NO_OF_CONN_RETRIES) &&
params.getInt(CONFIG_NO_OF_CONN_RETRIES) > 0) {
+ final Timer timer = new Timer(true);
+ TimerTask task = new TimerTask() {
+ @Override
+ public void run() {
+ // verify that we actually are
connected to a cluster
+ ImmutableList<DiscoveryNode> nodes =
ImmutableList.copyOf(((TransportClient) client).connectedNodes());
+ if (nodes.isEmpty()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Connection
Lost..Trying to reconnect to Elasticsearch nodes...");
+ }
+ open(new Configuration());
+ } else {
+ timer.cancel();
+
intializeAndCallElasticSearchSinkFunction(element);
+ }
+ }
+ };
+
+ timer.scheduleAtFixedRate(task, 0, 3000);
+
+ try {
+ Thread.sleep(3000 *
params.getInt(CONFIG_NO_OF_CONN_RETRIES));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ timer.cancel();
+ // verify that we actually are connected to a cluster
+ ImmutableList<DiscoveryNode> nodes =
ImmutableList.copyOf(((TransportClient) client).connectedNodes());
+ if (nodes.isEmpty()) {
+ throw new RuntimeException("Client is not
connected to any Elasticsearch nodes!");
+ }
+ } else {
+ intializeAndCallElasticSearchSinkFunction(element);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (bulkProcessor != null) {
+ bulkProcessor.close();
+ bulkProcessor = null;
+ }
+
+ if (client != null) {
+ client.close();
+ }
+
+ if (hasFailure.get()) {
+ Throwable cause = failureThrowable.get();
+ if (cause != null) {
+ throw new RuntimeException("An error occured in
ElasticsearchSink.", cause);
+ } else {
+ throw new RuntimeException("An error occured in
ElasticsearchSink.");
+ }
+ }
+ }
+
+ private void intializeAndCallElasticSearchSinkFunction(T element) {
--- End diff --
Where is the method initializing the ES sink?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---