[ 
https://issues.apache.org/jira/browse/FLINK-11747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16778196#comment-16778196
 ] 

Samir Desai edited comment on FLINK-11747 at 2/26/19 5:07 PM:
--------------------------------------------------------------

[~Salatich], That is correct. The issue is more that it's very difficult to get 
access to the high level rest client using the ES flink connector out-of-box. 
The [base 
sink|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L299]
 calls {{bridge.createClient()}}, and the [Elasticsearch 6 
sink|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java#L72]
 internally instantiates the call bridge, so adding sniffing requires 
inheriting or reimplementing these classes.

When attempting to add sniffing, I realized that many of the classes (including 
the base sink and the Elasticsearch6 sink) cannot be extended easily, because 
of private member variables and constructors. Additionally, they're often 
marked with {{@PublicEvolving}} or {{@Internal}}, and you have reimplement the 
interface with your own implementation, but this requires copying a significant 
amount of code.

Additionally, the Elasticsearch6 sink relies on a package private 
[BulkProcessorIndexer|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java],
 so if you reimplement the Elasticsearch6 sink in your project, you have to 
copy this file into your own project.


was (Author: yosami):
That is correct. The issue is more that it's very difficult to get access to 
the high level rest client using the ES flink connector out-of-box. The [base 
sink|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L299]
 calls {{bridge.createClient()}}, and the [Elasticsearch 6 
sink|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java#L72]
 internally instantiates the call bridge, so adding sniffing requires 
inheriting or reimplementing these classes.

When attempting to add sniffing, I realized that many of the classes (including 
the base sink and the Elasticsearch6 sink) cannot be extended easily, because 
of private member variables and constructors. Additionally, they're often 
marked with {{@PublicEvolving}} or {{@Internal}}, and you have reimplement the 
interface with your own implementation, but this requires copying a significant 
amount of code.

Additionally, the Elasticsearch6 sink relies on a package private 
[BulkProcessorIndexer|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java],
 so if you reimplement the Elasticsearch6 sink in your project, you have to 
copy this file into your own project.

> Elasticsearch 6 connector - Expose RestHighLevelClient to allow for custom 
> sniffing
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-11747
>                 URL: https://issues.apache.org/jira/browse/FLINK-11747
>             Project: Flink
>          Issue Type: Improvement
>          Components: ElasticSearch Connector
>            Reporter: Samir Desai
>            Priority: Major
>
> In the Elasticsearch6 connector, the 
> [RestClientFactory|https://github.com/apache/flink/blob/release-1.6/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java#L31]
>  allows users to customize the {{RestClientBuilder}}. However, certain 
> customizations like adding 
> [Sniffing|https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/_usage.html]
>  
> require access to the low-level rest client, which can be obtained through 
> the high level rest client. The {{RestHighLevelClient}} is 
> [instantiated|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java#L75]
>  in the api call bridge, and is never exposed to the user.
> To my knowledge, the current Elasticsearch6 connector does not utilize 
> sniffing or provide a way to add it in. The Elasticsearch6 connector should 
> expose some type of access to the RestHighLevelClient to allow for custom 
> sniffing. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to