[
https://issues.apache.org/jira/browse/FLINK-13025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16936142#comment-16936142
]
Victor commented on FLINK-13025:
--------------------------------
[~yanghua] I've tested the basic ES sink in a live project I have running Flink
1.8.1 and Elasticsearch 7.3.1
I had to get flink-connector-elasticsearch-base_2.12 version 1.8.1 separately
since there isn't a 1.10.x version yet for the connector you've created.
The basic Sink does work fine without specifying the "Type" property when
creating the IndexRequest as intended:
{code:java}
// new IndexRequest(msg.esIndexName)
.create(false)
.id(msg.esId)
.source(msg.esDocumentBody, XContentType.JSON){code}
also noticed the APIbridge is correctly pining ES cluster with Default Request
Options instead of failing with a 7.3.X ES client as before, so that's good
too!! and allows flink job to start well.
[https://github.com/apache/flink/pull/9720/files#diff-f2f261e881f5618405a827796728a6a4R82]
What I'm not sure, is if it would be better to restrict or add validations to
remove the capability to add a "Type" name when creating the IndexRequest since
it's no longer supported? IDEs will display the classic "deprecated"
underlying, but for new users I think is a weird user experience to have the
connector not complained an then see this log entry when the job gets executed:
{code:java}
//
WARNING: request [POST http://localhost:9200/_bulk?timeout=1m] returned 1
warnings: [299 Elasticsearch-7.3.1-4749ba6 "[types removal] Specifying types in
bulk requests is deprecated."]{code}
If you think that's out of the scope of the connector that's fair too, but I do
think at least the example should then omit the use of the "type" property in
the builder:
[https://github.com/apache/flink/pull/9720/files#diff-d131eab50d3d9f225c9e05ccca5a6ea4R139
]
The part that I haven't used yet and I'm not sure how to test is the
UpsertTableSink and I would like to test that out since I can see we have
"String docType," fields there and if a user provide those it might trigger an
"java.lang.IllegalArgumentException" error on ES side.
[https://github.com/apache/flink/pull/9720/files#diff-63da77546b576d0bd0b788d522fdad6fR244]
[https://github.com/apache/flink/pull/9720/files#diff-63da77546b576d0bd0b788d522fdad6fR232]
With a message like this:
{code:java}
//
elasticsearch | "stacktrace": ["java.lang.IllegalArgumentException:
Rejecting mapping update to [my-index] as the final mapping would have more
than 1 type: [_doc, mytype]",
elasticsearch | "at
org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.applyRequest(MetaDataMappingService.java:272)
~[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch | "at
org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.execute(MetaDataMappingService.java:238)
~[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch | "at
org.elasticsearch.cluster.service.MasterService.executeTasks(MasterService.java:687)
~[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch | "at
org.elasticsearch.cluster.service.MasterService.calculateTaskOutputs(MasterService.java:310)
~[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch | "at
org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:210)
[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch | "at
org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:142)
[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch | "at
org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150)
[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch | "at
org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188)
[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch | "at
org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:688)
[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch | "at
org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252)
[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch | "at
org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215)
[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch | "at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]",
elasticsearch | "at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?]",{code}
But as I said, I'm not sure how to test that in a live project so If anyone can
point me to some code example or can test that too it would be awesome.
I'd figure it out, but unfortunately I can't commit a lot of time at the moment.
> Elasticsearch 7.x support
> -------------------------
>
> Key: FLINK-13025
> URL: https://issues.apache.org/jira/browse/FLINK-13025
> Project: Flink
> Issue Type: New Feature
> Components: Connectors / ElasticSearch
> Affects Versions: 1.8.0
> Reporter: Keegan Standifer
> Priority: Major
> Labels: pull-request-available
> Attachments: flink-connector-elasticsearch7_2.12-1.10-SNAPSHOT.jar
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Elasticsearch 7.0.0 was released in April of 2019:
> [https://www.elastic.co/blog/elasticsearch-7-0-0-released]
> The latest elasticsearch connector is
> [flink-connector-elasticsearch6|https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-elasticsearch6]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)