[ 
https://issues.apache.org/jira/browse/FLINK-25211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Roberts updated FLINK-25211:
---------------------------------
    Description: 
Hi there, let's say I'm writing a Flink job that wants to insert data into 
Elastic and I'm importing the Elastic rest client for Flink in my job.

 

Specifically I have this at the moment, but my deployed Elasticsearch version 
is 7.15.1 (more on this shortly):

 

{{    <dependency>}}
{{        <groupId>org.elasticsearch.client</groupId>}}
{{        <artifactId>elasticsearch-rest-high-level-client</artifactId>}}
{{        <version>7.10.1</version>}}
{{      </dependency>}}

 

I know that works just fine when I'm deployed Elasticsearch 7.8.x, but when I 
switch to using Elasticsearch 7.15.1, I've noticed intermittent issues with 
authorisation (401s come back and headers are not added to subsequent requests, 
and so we have to implement retries in our Flink configuration and/or the Flink 
job itself). 

 

While there may be an issue with the particular code I'm writing (we have our 
own security plugin), I do then have questions around which versions of 
Elasticsearch our connector in Flink is known to work with.

 

Let's say I upgrade the Flink rest client version to be 7.15.1 - to match my 
Elasticsearch version, which might seem sensible.

 

If I do that, my JobManagers immediately crash because TimeValue has moved in 
the Elasticsearch code: see here 
[https://github.com/elastic/elasticsearch/commit/68817d7ca29c264b3ea3f766737d81e2ebb4028c#diff-d8695a17187facb254e5ba4b900c7d5a555e5ae5377f38704209e5e064ea630e]

 

Specifically we see this at startup:

 

{{{}java.lang.NoClassDefFoundError: 
org.elasticsearch.common.unit.TimeValue{}}}{{{}at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.configureFlushInterval(ElasticsearchSinkBase.java:420)
 ~[?:?]{}}}{{{}at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.buildBulkProcessor(ElasticsearchSinkBase.java:393)
 ~[?:?]{}}}{{{}at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:319)
 ~[?:?]{}}}{{{}at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at <unknown class>.call(Unknown 
Source) ~[?:?]{}}}{{{}at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at <unknown class>.run(Unknown 
Source) ~[?:?]{}}}{{{}at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at java.lang.Thread.run(Unknown 
Source) ~[?:?]{}}}{{{}Caused by: java.lang.ClassNotFoundException: 
org.elasticsearch.common.unit.TimeValue{}}}{{{}at 
java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]{}}}{{{}at 
java.lang.ClassLoader.loadClassHelper(Unknown Source) ~[?:?]{}}}{{{}at 
java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] (omitted the rest for 
brevity - although this is still quite long){}}}

 

and so I think - fine, what does the documentation say? 

 

If we look at find 
[https://github.com/apache/flink/blob/master/docs/content/docs/connectors/datastream/elasticsearch.md]
 - this states 7.5.1 of Elasticsearch. 

 

And then if we look here, we see that the old package for TimeValue is present 
in the current connector code for Flink 
[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L40.]

 

And this leads me to this JIRA post: is the current documentation accurate in 
that we can only use up to Elasticsearch 7.5.1, reliably, with the connector 
code in Flink? 

 

What is the "lay of the land" insofar as the Flink community and Elastic; is 
this an area that's actively looked into or is the general consensus that one 
should write their own connector of sorts? 

 

Are there plans to increase said version? Does it have to be this for a 
particular reason?

 

Many thanks in advance, I can say that we've been using the 7.8 client just 
fine but the move to test with Elasticsearch 7.15 has come with a few 
challenges and so I'm hoping the Flink community can possibly some shed some 
light in this direction.

 

Cheers,

  was:
Hi there, let's say I'm writing a Flink job that wants to insert data into 
Elastic and I'm importing the Elastic rest client for Flink in my job.

 

Specifically I have this at the moment, but my deployed Elasticsearch version 
is 7.15.1 (more on this shortly):

 

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.10.1</version>
      </dependency>

 

I know that works just fine when I'm deployed Elasticsearch 7.8.x, but when I 
switch to using Elasticsearch 7.15.1, I've noticed intermittent issues with 
authorisation (401s come back and headers are not added to subsequent requests, 
and so we have to implement retries in our Flink configuration and/or the Flink 
job itself). 

 

While there may be an issue with the particular code I'm writing (we have our 
own security plugin), I do then have questions around which versions of 
Elasticsearch our connector in Flink is known to work with.

 

Let's say I upgrade the Flink rest client version to be 7.15.1 - to match my 
Elasticsearch version, which might seem sensible.

 

If I do that, my JobManagers immediately crash because TimeValue has moved in 
the Elasticsearch code: see here 
[https://github.com/elastic/elasticsearch/commit/68817d7ca29c264b3ea3f766737d81e2ebb4028c#diff-d8695a17187facb254e5ba4b900c7d5a555e5ae5377f38704209e5e064ea630e]

 

Specifically we see this at startup:

 

java.lang.NoClassDefFoundError: org.elasticsearch.common.unit.TimeValue

at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.configureFlushInterval(ElasticsearchSinkBase.java:420)
 ~[?:?]

at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.buildBulkProcessor(ElasticsearchSinkBase.java:393)
 ~[?:?]

at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:319)
 ~[?:?]

at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]

at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]

at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]

at <unknown class>.call(Unknown Source) ~[?:?]

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]

at <unknown class>.run(Unknown Source) ~[?:?]

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]

at java.lang.Thread.run(Unknown Source) ~[?:?]

Caused by: java.lang.ClassNotFoundException: 
org.elasticsearch.common.unit.TimeValue

at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]

at java.lang.ClassLoader.loadClassHelper(Unknown Source) ~[?:?]

at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] (omitted the rest for 
brevity - although this is still quite long)

 

and so I think - fine, what does the documentation say? 

 

If we look at find 
[https://github.com/apache/flink/blob/master/docs/content/docs/connectors/datastream/elasticsearch.md]
 - this states 7.5.1 of Elasticsearch. 

 

And then if we look here, we see that the old package for TimeValue is present 
in the current connector code for Flink 
[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L40.]

 

And this leads me to this JIRA post: is the current documentation accurate in 
that we can only use up to Elasticsearch 7.5.1, reliably, with the connector 
code in Flink? 

 

What is the "lay of the land" insofar as the Flink community and Elastic; is 
this an area that's actively looked into or is the general consensus that one 
should write their own connector of sorts? 

 

Are there plans to increase said version? Does it have to be this for a 
particular reason?

 

Many thanks in advance, I can say that we've been using the 7.8 client just 
fine but the move to test with Elasticsearch 7.15 has come with a few 
challenges and so I'm hoping the Flink community can possibly some shed some 
light in this direction.

 

Cheers,


> Elasticsearch connector version compatibility
> ---------------------------------------------
>
>                 Key: FLINK-25211
>                 URL: https://issues.apache.org/jira/browse/FLINK-25211
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Adam Roberts
>            Priority: Major
>              Labels: question
>
> Hi there, let's say I'm writing a Flink job that wants to insert data into 
> Elastic and I'm importing the Elastic rest client for Flink in my job.
>  
> Specifically I have this at the moment, but my deployed Elasticsearch version 
> is 7.15.1 (more on this shortly):
>  
> {{    <dependency>}}
> {{        <groupId>org.elasticsearch.client</groupId>}}
> {{        <artifactId>elasticsearch-rest-high-level-client</artifactId>}}
> {{        <version>7.10.1</version>}}
> {{      </dependency>}}
>  
> I know that works just fine when I'm deployed Elasticsearch 7.8.x, but when I 
> switch to using Elasticsearch 7.15.1, I've noticed intermittent issues with 
> authorisation (401s come back and headers are not added to subsequent 
> requests, and so we have to implement retries in our Flink configuration 
> and/or the Flink job itself). 
>  
> While there may be an issue with the particular code I'm writing (we have our 
> own security plugin), I do then have questions around which versions of 
> Elasticsearch our connector in Flink is known to work with.
>  
> Let's say I upgrade the Flink rest client version to be 7.15.1 - to match my 
> Elasticsearch version, which might seem sensible.
>  
> If I do that, my JobManagers immediately crash because TimeValue has moved in 
> the Elasticsearch code: see here 
> [https://github.com/elastic/elasticsearch/commit/68817d7ca29c264b3ea3f766737d81e2ebb4028c#diff-d8695a17187facb254e5ba4b900c7d5a555e5ae5377f38704209e5e064ea630e]
>  
> Specifically we see this at startup:
>  
> {{{}java.lang.NoClassDefFoundError: 
> org.elasticsearch.common.unit.TimeValue{}}}{{{}at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.configureFlushInterval(ElasticsearchSinkBase.java:420)
>  ~[?:?]{}}}{{{}at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.buildBulkProcessor(ElasticsearchSinkBase.java:393)
>  ~[?:?]{}}}{{{}at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:319)
>  ~[?:?]{}}}{{{}at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at <unknown class>.call(Unknown 
> Source) ~[?:?]{}}}{{{}at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at <unknown class>.run(Unknown 
> Source) ~[?:?]{}}}{{{}at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at java.lang.Thread.run(Unknown 
> Source) ~[?:?]{}}}{{{}Caused by: java.lang.ClassNotFoundException: 
> org.elasticsearch.common.unit.TimeValue{}}}{{{}at 
> java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]{}}}{{{}at 
> java.lang.ClassLoader.loadClassHelper(Unknown Source) ~[?:?]{}}}{{{}at 
> java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] (omitted the rest for 
> brevity - although this is still quite long){}}}
>  
> and so I think - fine, what does the documentation say? 
>  
> If we look at find 
> [https://github.com/apache/flink/blob/master/docs/content/docs/connectors/datastream/elasticsearch.md]
>  - this states 7.5.1 of Elasticsearch. 
>  
> And then if we look here, we see that the old package for TimeValue is 
> present in the current connector code for Flink 
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L40.]
>  
> And this leads me to this JIRA post: is the current documentation accurate in 
> that we can only use up to Elasticsearch 7.5.1, reliably, with the connector 
> code in Flink? 
>  
> What is the "lay of the land" insofar as the Flink community and Elastic; is 
> this an area that's actively looked into or is the general consensus that one 
> should write their own connector of sorts? 
>  
> Are there plans to increase said version? Does it have to be this for a 
> particular reason?
>  
> Many thanks in advance, I can say that we've been using the 7.8 client just 
> fine but the move to test with Elasticsearch 7.15 has come with a few 
> challenges and so I'm hoping the Flink community can possibly some shed some 
> light in this direction.
>  
> Cheers,



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to