[ 
https://issues.apache.org/jira/browse/FLINK-13025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16936142#comment-16936142
 ] 

Victor edited comment on FLINK-13025 at 9/23/19 7:37 PM:
---------------------------------------------------------

[~yanghua] I've tested the basic ES sink in a live project I have running Flink 
1.8.1 and Elasticsearch 7.3.1
 I had to get flink-connector-elasticsearch-base_2.12 version 1.8.1 separately 
since there isn't a 1.10.x version yet for the connector you've created.

The basic Sink does work fine without specifying the "Type" property when 
creating the IndexRequest as intended:
{code:java}
new IndexRequest(msg.esIndexName)
  .create(false)
  .id(msg.esId)
  .source(msg.esDocumentBody, XContentType.JSON){code}
also noticed the APIbridge is correctly pining ES cluster with Default Request 
Options instead of failing with a 7.3.X ES client as before, so that's good 
too!! and allows flink job to start well. 
[https://github.com/apache/flink/pull/9720/files#diff-f2f261e881f5618405a827796728a6a4R82]

What I'm not sure, is if it would be better to restrict or add validations to 
remove the capability to add a "Type" name when creating the IndexRequest since 
it's no longer supported? IDEs will display the classic "deprecated" 
underlying, but for new users I think is a weird user experience to have the 
connector not complained an then see this log entry when the job gets executed:
{code:java}
WARNING: request [POST http://localhost:9200/_bulk?timeout=1m] returned 1 
warnings: [299 Elasticsearch-7.3.1-4749ba6 "[types removal] Specifying types in 
bulk requests is deprecated."]{code}
If you think that's out of the scope of the connector that's fair too, but I do 
think at least the example should then omit the use of the "type" property in 
the builder: 
[https://github.com/apache/flink/pull/9720/files#diff-d131eab50d3d9f225c9e05ccca5a6ea4R139
 ]
 The part that I haven't used yet and I'm not sure how to test is the 
UpsertTableSink and I would like to test that out since I can see we have 
"String docType," fields there and if a user provide those it might trigger an 
"java.lang.IllegalArgumentException" error on ES side. 
 
[https://github.com/apache/flink/pull/9720/files#diff-63da77546b576d0bd0b788d522fdad6fR244]
 
[https://github.com/apache/flink/pull/9720/files#diff-63da77546b576d0bd0b788d522fdad6fR232]

With a message like this:
{code:java}
elasticsearch     | "stacktrace": ["java.lang.IllegalArgumentException: 
Rejecting mapping update to [my-index] as the final mapping would have more 
than 1 type: [_doc, mytype]",
elasticsearch     | "at 
org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.applyRequest(MetaDataMappingService.java:272)
 ~[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.execute(MetaDataMappingService.java:238)
 ~[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.cluster.service.MasterService.executeTasks(MasterService.java:687)
 ~[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.cluster.service.MasterService.calculateTaskOutputs(MasterService.java:310)
 ~[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:210)
 [elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:142)
 [elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150)
 [elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188)
 [elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:688)
 [elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252)
 [elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215)
 [elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]",
elasticsearch     | "at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]",{code}
But as I said, I'm not sure how to test that in a live project so If anyone can 
point me to some code example or can test that too it would be awesome.
 I'd figure it out, but unfortunately I can't commit a lot of time at the 
moment.
  


was (Author: victorvilladev):
[~yanghua] I've tested the basic ES sink in a live project I have running Flink 
1.8.1 and Elasticsearch 7.3.1
I had to get flink-connector-elasticsearch-base_2.12 version 1.8.1 separately 
since there isn't a 1.10.x version yet for the connector you've created.

The basic Sink does work fine without specifying the "Type" property when 
creating the IndexRequest as intended:
{code:java}
// new IndexRequest(msg.esIndexName)
  .create(false)
  .id(msg.esId)
  .source(msg.esDocumentBody, XContentType.JSON){code}
also noticed the APIbridge is correctly pining ES cluster with Default Request 
Options instead of failing with a 7.3.X ES client as before, so that's good 
too!! and allows flink job to start well. 
[https://github.com/apache/flink/pull/9720/files#diff-f2f261e881f5618405a827796728a6a4R82]

What I'm not sure, is if it would be better to restrict or add validations to 
remove the capability to add a "Type" name when creating the IndexRequest since 
it's no longer supported? IDEs will display the classic "deprecated" 
underlying, but for new users I think is a weird user experience to have the 
connector not complained an then see this log entry when the job gets executed:
{code:java}
// 
WARNING: request [POST http://localhost:9200/_bulk?timeout=1m] returned 1 
warnings: [299 Elasticsearch-7.3.1-4749ba6 "[types removal] Specifying types in 
bulk requests is deprecated."]{code}
If you think that's out of the scope of the connector that's fair too, but I do 
think at least the example should then omit the use of the "type" property in 
the builder: 
[https://github.com/apache/flink/pull/9720/files#diff-d131eab50d3d9f225c9e05ccca5a6ea4R139
]
The part that I haven't used yet and I'm not sure how to test is the 
UpsertTableSink and I would like to test that out since I can see we have 
"String docType," fields there and if a user provide those it might trigger an 
"java.lang.IllegalArgumentException" error on ES side. 
[https://github.com/apache/flink/pull/9720/files#diff-63da77546b576d0bd0b788d522fdad6fR244]
[https://github.com/apache/flink/pull/9720/files#diff-63da77546b576d0bd0b788d522fdad6fR232]

With a message like this:
{code:java}
// 
elasticsearch     | "stacktrace": ["java.lang.IllegalArgumentException: 
Rejecting mapping update to [my-index] as the final mapping would have more 
than 1 type: [_doc, mytype]",
elasticsearch     | "at 
org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.applyRequest(MetaDataMappingService.java:272)
 ~[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.execute(MetaDataMappingService.java:238)
 ~[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.cluster.service.MasterService.executeTasks(MasterService.java:687)
 ~[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.cluster.service.MasterService.calculateTaskOutputs(MasterService.java:310)
 ~[elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:210)
 [elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:142)
 [elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150)
 [elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188)
 [elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:688)
 [elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252)
 [elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215)
 [elasticsearch-7.3.1.jar:7.3.1]",
elasticsearch     | "at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]",
elasticsearch     | "at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]",{code}

But as I said, I'm not sure how to test that in a live project so If anyone can 
point me to some code example or can test that too it would be awesome.
I'd figure it out, but unfortunately I can't commit a lot of time at the moment.
 

> Elasticsearch 7.x support
> -------------------------
>
>                 Key: FLINK-13025
>                 URL: https://issues.apache.org/jira/browse/FLINK-13025
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / ElasticSearch
>    Affects Versions: 1.8.0
>            Reporter: Keegan Standifer
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: flink-connector-elasticsearch7_2.12-1.10-SNAPSHOT.jar
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Elasticsearch 7.0.0 was released in April of 2019: 
> [https://www.elastic.co/blog/elasticsearch-7-0-0-released]
> The latest elasticsearch connector is 
> [flink-connector-elasticsearch6|https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-elasticsearch6]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to