[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15821606#comment-15821606
 ] 

ASF GitHub Bot commented on FLINK-4988:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3112#discussion_r95974130
  
    --- Diff: 
flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
 ---
    @@ -17,217 +17,51 @@
      */
     package org.apache.flink.streaming.connectors.elasticsearch2;
     
    -import org.apache.flink.api.common.functions.RuntimeContext;
    -import org.apache.flink.api.java.tuple.Tuple2;
    -import org.apache.flink.runtime.client.JobExecutionException;
    -import org.apache.flink.streaming.api.datastream.DataStreamSource;
    -import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    -import org.apache.flink.streaming.api.functions.source.SourceFunction;
    -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
    -import org.elasticsearch.action.get.GetRequest;
    -import org.elasticsearch.action.get.GetResponse;
    -import org.elasticsearch.action.index.IndexRequest;
    -import org.elasticsearch.client.Client;
    -import org.elasticsearch.client.Requests;
    -import org.elasticsearch.common.settings.Settings;
    -import org.elasticsearch.node.Node;
    -import org.elasticsearch.node.NodeBuilder;
    -import org.junit.Assert;
    -import org.junit.ClassRule;
    +import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
    +import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
    +import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
     import org.junit.Test;
    -import org.junit.rules.TemporaryFolder;
     
    -import java.io.File;
     import java.net.InetAddress;
     import java.net.InetSocketAddress;
     import java.util.ArrayList;
    -import java.util.HashMap;
     import java.util.List;
     import java.util.Map;
     
    -public class ElasticsearchSinkITCase extends 
StreamingMultipleProgramsTestBase {
    -
    -   private static final int NUM_ELEMENTS = 20;
    -
    -   @ClassRule
    -   public static TemporaryFolder tempFolder = new TemporaryFolder();
    +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
     
        @Test
        public void testTransportClient() throws Exception {
    -
    -           File dataDir = tempFolder.newFolder();
    -
    -           Node node = NodeBuilder.nodeBuilder()
    -                           .settings(Settings.settingsBuilder()
    -                                           .put("path.home", 
dataDir.getParent())
    -                                           .put("http.enabled", false)
    -                                           .put("path.data", 
dataDir.getAbsolutePath()))
    -                           // set a custom cluster name to verify that 
user config works correctly
    -                           .clusterName("my-transport-client-cluster")
    -                           .node();
    -
    -           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    -
    -           DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
    -
    -           Map<String, String> config = new HashMap<>();
    -           // This instructs the sink to emit after every element, 
otherwise they would be buffered
    -           config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
    -           config.put("cluster.name", "my-transport-client-cluster");
    -
    -           // Can't use {@link TransportAddress} as its not Serializable 
in Elasticsearch 2.x
    -           List<InetSocketAddress> transports = new ArrayList<>();
    -           transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
    -
    -           source.addSink(new ElasticsearchSink<>(config, transports, new 
TestElasticsearchSinkFunction()));
    -
    -           env.execute("Elasticsearch TransportClient Test");
    -
    -           // verify the results
    -           Client client = node.client();
    -           for (int i = 0; i < NUM_ELEMENTS; i++) {
    -                   GetResponse response = client.get(new 
GetRequest("my-index",
    -                                   "my-type", 
Integer.toString(i))).actionGet();
    -                   Assert.assertEquals("message #" + i, 
response.getSource().get("data"));
    -           }
    -
    -           node.close();
    +           runTransportClientTest();
        }
     
    - @Test(expected = IllegalArgumentException.class)
    - public void testNullTransportClient() throws Exception {
    -
    -   File dataDir = tempFolder.newFolder();
    -
    -   Node node = NodeBuilder.nodeBuilder()
    -           .settings(Settings.settingsBuilder()
    -                   .put("path.home", dataDir.getParent())
    -                   .put("http.enabled", false)
    -                   .put("path.data", dataDir.getAbsolutePath()))
    -           // set a custom cluster name to verify that user config works 
correctly
    -           .clusterName("my-transport-client-cluster")
    -           .node();
    -
    -   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    -
    -   DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new 
TestSourceFunction());
    -
    -   Map<String, String> config = new HashMap<>();
    -   // This instructs the sink to emit after every element, otherwise they 
would be buffered
    -   config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
    -   config.put("cluster.name", "my-transport-client-cluster");
    -
    -   source.addSink(new ElasticsearchSink<>(config, null, new 
TestElasticsearchSinkFunction()));
    -
    -   env.execute("Elasticsearch TransportClient Test");
    -
    -   // verify the results
    -   Client client = node.client();
    -   for (int i = 0; i < NUM_ELEMENTS; i++) {
    -    GetResponse response = client.get(new GetRequest("my-index",
    -            "my-type", Integer.toString(i))).actionGet();
    -    Assert.assertEquals("message #" + i, response.getSource().get("data"));
    +   @Test
    +   public void testNullTransportClient() throws Exception {
    +           runNullTransportClientTest();
        }
     
    -   node.close();
    - }
    -
    - @Test(expected = IllegalArgumentException.class)
    - public void testEmptyTransportClient() throws Exception {
    -
    -   File dataDir = tempFolder.newFolder();
    -
    -   Node node = NodeBuilder.nodeBuilder()
    -           .settings(Settings.settingsBuilder()
    -                   .put("path.home", dataDir.getParent())
    -                   .put("http.enabled", false)
    -                   .put("path.data", dataDir.getAbsolutePath()))
    -           // set a custom cluster name to verify that user config works 
correctly
    -           .clusterName("my-transport-client-cluster")
    -           .node();
    -
    -   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    -
    -   DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new 
TestSourceFunction());
    -
    -   Map<String, String> config = new HashMap<>();
    -   // This instructs the sink to emit after every element, otherwise they 
would be buffered
    -   config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
    -   config.put("cluster.name", "my-transport-client-cluster");
    -
    -   source.addSink(new ElasticsearchSink<>(config, new 
ArrayList<InetSocketAddress>(), new TestElasticsearchSinkFunction()));
    -
    -   env.execute("Elasticsearch TransportClient Test");
    -
    -   // verify the results
    -   Client client = node.client();
    -   for (int i = 0; i < NUM_ELEMENTS; i++) {
    -    GetResponse response = client.get(new GetRequest("my-index",
    -            "my-type", Integer.toString(i))).actionGet();
    -    Assert.assertEquals("message #" + i, response.getSource().get("data"));
    +   @Test
    +   public void testEmptyTransportClient() throws Exception {
    +           runEmptyTransportClientTest();
        }
     
    -   node.close();
    - }
    -
    -   @Test(expected = JobExecutionException.class)
    +   @Test
        public void testTransportClientFails() throws Exception{
    -           // this checks whether the TransportClient fails early when 
there is no cluster to
    -           // connect to. There isn't a similar test for the Node Client 
version since that
    -           // one will block and wait for a cluster to come online
    -
    -           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    -
    -           DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
    -
    -           Map<String, String> config = new HashMap<>();
    -           // This instructs the sink to emit after every element, 
otherwise they would be buffered
    -           config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
    -           config.put("cluster.name", "my-node-client-cluster");
    -
    -           List<InetSocketAddress> transports = new ArrayList<>();
    -           transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
    -
    -           source.addSink(new ElasticsearchSink<>(config, transports, new 
TestElasticsearchSinkFunction()));
    -
    -           env.execute("Elasticsearch Node Client Test");
    +           runTransportClientFailsTest();
        }
     
    -   private static class TestSourceFunction implements 
SourceFunction<Tuple2<Integer, String>> {
    -           private static final long serialVersionUID = 1L;
    -
    -           private volatile boolean running = true;
    -
    -           @Override
    -           public void run(SourceContext<Tuple2<Integer, String>> ctx) 
throws Exception {
    -                   for (int i = 0; i < NUM_ELEMENTS && running; i++) {
    -                           ctx.collect(Tuple2.of(i, "message #" + i));
    -                   }
    -           }
    -
    -           @Override
    -           public void cancel() {
    -                   running = false;
    -           }
    +   @Override
    +   protected <T> ElasticsearchSinkBase<T> 
createElasticsearchSink(Map<String, String> userConfig,
    +                                                                           
                                                List<InetSocketAddress> 
transportAddresses,
    +                                                                           
                                                ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction) {
    +           return new ElasticsearchSink<>(userConfig, transportAddresses, 
elasticsearchSinkFunction);
    --- End diff --
    
    Whitespace


> Elasticsearch 5.x support
> -------------------------
>
>                 Key: FLINK-4988
>                 URL: https://issues.apache.org/jira/browse/FLINK-4988
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to