[
https://issues.apache.org/jira/browse/FLINK-9885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555248#comment-16555248
]
ASF GitHub Bot commented on FLINK-9885:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6391#discussion_r204993234
--- Diff:
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import
org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * IT cases for the {@link ElasticsearchSink}.
+ */
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+ /**
+ * Tests that the Elasticsearch sink works properly using a {@link
RestHighLevelClient}.
+ */
+ public void runTransportClientTest() throws Exception {
+ final String index = "transport-client-test-index";
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<Tuple2<Integer, String>> source =
env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
+
+ Map<String, String> userConfig = new HashMap<>();
+ // This instructs the sink to emit after every element,
otherwise they would be buffered
+
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+
source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
+ new
SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
+
+ env.execute("Elasticsearch RestHighLevelClient Test");
+
+ // verify the results
+ Client client = embeddedNodeEnv.getClient();
+ SourceSinkDataTestKit.verifyProducedSinkData(client, index);
+
+ client.close();
+ }
+
+ /**
+ * Tests that the Elasticsearch sink fails eagerly if the provided list
of transport addresses is {@code null}.
+ */
+ public void runNullTransportClientTest() throws Exception {
+ try {
+ Map<String, String> userConfig = new HashMap<>();
+
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+ createElasticsearchSink6(userConfig, null, new
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+ } catch (IllegalArgumentException expectedException) {
+ // test passes
+ return;
+ }
+
+ fail();
+ }
+
+ /**
+ * Tests that the Elasticsearch sink fails eagerly if the provided list
of transport addresses is empty.
+ */
+ public void runEmptyTransportClientTest() throws Exception {
+ try {
+ Map<String, String> userConfig = new HashMap<>();
+
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+ createElasticsearchSink6(userConfig,
+ Collections.<HttpHost>emptyList(),
+ new
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+ } catch (IllegalArgumentException expectedException) {
+ // test passes
+ return;
+ }
+
+ fail();
+ }
+
+ /**
+ * Tests whether the Elasticsearch sink fails when there is no cluster
to connect to.
+ */
+ public void runTransportClientFailsTest() throws Exception {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<Tuple2<Integer, String>> source =
env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
+
+ Map<String, String> userConfig = new HashMap<>();
+ // This instructs the sink to emit after every element,
otherwise they would be buffered
+
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+
source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
+ new
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
+
+ try {
+ env.execute("Elasticsearch Transport Client Test");
+ } catch (JobExecutionException expectedException) {
+
assertTrue(expectedException.getCause().getMessage().contains("not connected to
any Elasticsearch nodes"));
+ return;
+ }
+
+ fail();
+ }
+
+ @Override
+ protected <T> ElasticsearchSinkBase<T>
createElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress>
transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+ return null;
+ }
+
+ @Override
+ protected <T> ElasticsearchSinkBase<T>
createElasticsearchSinkForEmbeddedNode(Map<String, String> userConfig,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
+ ArrayList<HttpHost> httpHosts = new ArrayList<>();
+ httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+ return new ElasticsearchSink<>(userConfig, httpHosts,
elasticsearchSinkFunction);
+ }
+
+ private <T> ElasticsearchSinkBase<T> createElasticsearchSink6(
--- End diff --
Merge this function with `createElasticsearchSink`?
> End-to-end test: Elasticsearch 6.x connector
> --------------------------------------------
>
> Key: FLINK-9885
> URL: https://issues.apache.org/jira/browse/FLINK-9885
> Project: Flink
> Issue Type: Sub-task
> Components: ElasticSearch Connector, Tests
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We have decided to try and merge the pending Elasticsearch 6.x PRs. This
> should also come with an end-to-end test that covers this.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)