[
https://issues.apache.org/jira/browse/FLINK-25211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-25211:
-----------------------------------
Labels: question stale-major (was: question)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Major but is unassigned and neither itself nor its Sub-Tasks have been updated
for 60 days. I have gone ahead and added a "stale-major" to the issue". If this
ticket is a Major, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> Elasticsearch connector version compatibility
> ---------------------------------------------
>
> Key: FLINK-25211
> URL: https://issues.apache.org/jira/browse/FLINK-25211
> Project: Flink
> Issue Type: Improvement
> Reporter: Adam Roberts
> Priority: Major
> Labels: question, stale-major
>
> Hi there, let's say I'm writing a Flink job that wants to insert data into
> Elastic and I'm importing the Elastic rest client for Flink in my job.
>
> Specifically I have this at the moment, but my deployed Elasticsearch version
> is 7.15.1 (more on this shortly):
>
> {{ <dependency>}}
> {{ <groupId>org.elasticsearch.client</groupId>}}
> {{ <artifactId>elasticsearch-rest-high-level-client</artifactId>}}
> {{ <version>7.10.1</version>}}
> {{ </dependency>}}
>
> I know that works just fine when I'm deployed Elasticsearch 7.8.x, but when I
> switch to using Elasticsearch 7.15.1, I've noticed intermittent issues with
> authorisation (401s come back and headers are not added to subsequent
> requests, and so we have to implement retries in our Flink configuration
> and/or the Flink job itself).
>
> While there may be an issue with the particular code I'm writing (we have our
> own security plugin), I do then have questions around which versions of
> Elasticsearch our connector in Flink is known to work with.
>
> Let's say I upgrade the Flink rest client version to be 7.15.1 - to match my
> Elasticsearch version, which might seem sensible.
>
> If I do that, my JobManagers immediately crash because TimeValue has moved in
> the Elasticsearch code: see here
> [https://github.com/elastic/elasticsearch/commit/68817d7ca29c264b3ea3f766737d81e2ebb4028c#diff-d8695a17187facb254e5ba4b900c7d5a555e5ae5377f38704209e5e064ea630e]
>
> Specifically we see this at startup:
>
> {{{}java.lang.NoClassDefFoundError:
> org.elasticsearch.common.unit.TimeValue{}}}{{{}at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.configureFlushInterval(ElasticsearchSinkBase.java:420)
> ~[?:?]{}}}{{{}at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.buildBulkProcessor(ElasticsearchSinkBase.java:393)
> ~[?:?]{}}}{{{}at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:319)
> ~[?:?]{}}}{{{}at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at <unknown class>.call(Unknown
> Source) ~[?:?]{}}}{{{}at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at <unknown class>.run(Unknown
> Source) ~[?:?]{}}}{{{}at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]{}}}{{{}at java.lang.Thread.run(Unknown
> Source) ~[?:?]{}}}{{{}Caused by: java.lang.ClassNotFoundException:
> org.elasticsearch.common.unit.TimeValue{}}}{{{}at
> java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]{}}}{{{}at
> java.lang.ClassLoader.loadClassHelper(Unknown Source) ~[?:?]{}}}{{{}at
> java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] (omitted the rest for
> brevity - although this is still quite long){}}}
>
> and so I think - fine, what does the documentation say?
>
> If we look at find
> [https://github.com/apache/flink/blob/master/docs/content/docs/connectors/datastream/elasticsearch.md]
> - this states 7.5.1 of Elasticsearch.
>
> And then if we look here, we see that the old package for TimeValue is
> present in the current connector code for Flink
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L40.]
>
> And this leads me to this JIRA post: is the current documentation accurate in
> that we can only use up to Elasticsearch 7.5.1, reliably, with the connector
> code in Flink?
>
> What is the "lay of the land" insofar as the Flink community and Elastic; is
> this an area that's actively looked into or is the general consensus that one
> should write their own connector of sorts?
>
> Are there plans to increase said version? Does it have to be this for a
> particular reason?
>
> Many thanks in advance, I can say that we've been using the 7.8 client just
> fine but the move to test with Elasticsearch 7.15 has come with a few
> challenges and so I'm hoping the Flink community can possibly some shed some
> light in this direction.
>
> Cheers,
--
This message was sent by Atlassian Jira
(v8.20.1#820001)