Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3112#discussion_r95974130
  
    --- Diff: 
flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
 ---
    @@ -17,217 +17,51 @@
      */
     package org.apache.flink.streaming.connectors.elasticsearch2;
     
    -import org.apache.flink.api.common.functions.RuntimeContext;
    -import org.apache.flink.api.java.tuple.Tuple2;
    -import org.apache.flink.runtime.client.JobExecutionException;
    -import org.apache.flink.streaming.api.datastream.DataStreamSource;
    -import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    -import org.apache.flink.streaming.api.functions.source.SourceFunction;
    -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
    -import org.elasticsearch.action.get.GetRequest;
    -import org.elasticsearch.action.get.GetResponse;
    -import org.elasticsearch.action.index.IndexRequest;
    -import org.elasticsearch.client.Client;
    -import org.elasticsearch.client.Requests;
    -import org.elasticsearch.common.settings.Settings;
    -import org.elasticsearch.node.Node;
    -import org.elasticsearch.node.NodeBuilder;
    -import org.junit.Assert;
    -import org.junit.ClassRule;
    +import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
    +import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
    +import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
     import org.junit.Test;
    -import org.junit.rules.TemporaryFolder;
     
    -import java.io.File;
     import java.net.InetAddress;
     import java.net.InetSocketAddress;
     import java.util.ArrayList;
    -import java.util.HashMap;
     import java.util.List;
     import java.util.Map;
     
    -public class ElasticsearchSinkITCase extends 
StreamingMultipleProgramsTestBase {
    -
    -   private static final int NUM_ELEMENTS = 20;
    -
    -   @ClassRule
    -   public static TemporaryFolder tempFolder = new TemporaryFolder();
    +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
     
        @Test
        public void testTransportClient() throws Exception {
    -
    -           File dataDir = tempFolder.newFolder();
    -
    -           Node node = NodeBuilder.nodeBuilder()
    -                           .settings(Settings.settingsBuilder()
    -                                           .put("path.home", 
dataDir.getParent())
    -                                           .put("http.enabled", false)
    -                                           .put("path.data", 
dataDir.getAbsolutePath()))
    -                           // set a custom cluster name to verify that 
user config works correctly
    -                           .clusterName("my-transport-client-cluster")
    -                           .node();
    -
    -           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    -
    -           DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
    -
    -           Map<String, String> config = new HashMap<>();
    -           // This instructs the sink to emit after every element, 
otherwise they would be buffered
    -           config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
    -           config.put("cluster.name", "my-transport-client-cluster");
    -
    -           // Can't use {@link TransportAddress} as its not Serializable 
in Elasticsearch 2.x
    -           List<InetSocketAddress> transports = new ArrayList<>();
    -           transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
    -
    -           source.addSink(new ElasticsearchSink<>(config, transports, new 
TestElasticsearchSinkFunction()));
    -
    -           env.execute("Elasticsearch TransportClient Test");
    -
    -           // verify the results
    -           Client client = node.client();
    -           for (int i = 0; i < NUM_ELEMENTS; i++) {
    -                   GetResponse response = client.get(new 
GetRequest("my-index",
    -                                   "my-type", 
Integer.toString(i))).actionGet();
    -                   Assert.assertEquals("message #" + i, 
response.getSource().get("data"));
    -           }
    -
    -           node.close();
    +           runTransportClientTest();
        }
     
    - @Test(expected = IllegalArgumentException.class)
    - public void testNullTransportClient() throws Exception {
    -
    -   File dataDir = tempFolder.newFolder();
    -
    -   Node node = NodeBuilder.nodeBuilder()
    -           .settings(Settings.settingsBuilder()
    -                   .put("path.home", dataDir.getParent())
    -                   .put("http.enabled", false)
    -                   .put("path.data", dataDir.getAbsolutePath()))
    -           // set a custom cluster name to verify that user config works 
correctly
    -           .clusterName("my-transport-client-cluster")
    -           .node();
    -
    -   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    -
    -   DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new 
TestSourceFunction());
    -
    -   Map<String, String> config = new HashMap<>();
    -   // This instructs the sink to emit after every element, otherwise they 
would be buffered
    -   config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
    -   config.put("cluster.name", "my-transport-client-cluster");
    -
    -   source.addSink(new ElasticsearchSink<>(config, null, new 
TestElasticsearchSinkFunction()));
    -
    -   env.execute("Elasticsearch TransportClient Test");
    -
    -   // verify the results
    -   Client client = node.client();
    -   for (int i = 0; i < NUM_ELEMENTS; i++) {
    -    GetResponse response = client.get(new GetRequest("my-index",
    -            "my-type", Integer.toString(i))).actionGet();
    -    Assert.assertEquals("message #" + i, response.getSource().get("data"));
    +   @Test
    +   public void testNullTransportClient() throws Exception {
    +           runNullTransportClientTest();
        }
     
    -   node.close();
    - }
    -
    - @Test(expected = IllegalArgumentException.class)
    - public void testEmptyTransportClient() throws Exception {
    -
    -   File dataDir = tempFolder.newFolder();
    -
    -   Node node = NodeBuilder.nodeBuilder()
    -           .settings(Settings.settingsBuilder()
    -                   .put("path.home", dataDir.getParent())
    -                   .put("http.enabled", false)
    -                   .put("path.data", dataDir.getAbsolutePath()))
    -           // set a custom cluster name to verify that user config works 
correctly
    -           .clusterName("my-transport-client-cluster")
    -           .node();
    -
    -   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    -
    -   DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new 
TestSourceFunction());
    -
    -   Map<String, String> config = new HashMap<>();
    -   // This instructs the sink to emit after every element, otherwise they 
would be buffered
    -   config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
    -   config.put("cluster.name", "my-transport-client-cluster");
    -
    -   source.addSink(new ElasticsearchSink<>(config, new 
ArrayList<InetSocketAddress>(), new TestElasticsearchSinkFunction()));
    -
    -   env.execute("Elasticsearch TransportClient Test");
    -
    -   // verify the results
    -   Client client = node.client();
    -   for (int i = 0; i < NUM_ELEMENTS; i++) {
    -    GetResponse response = client.get(new GetRequest("my-index",
    -            "my-type", Integer.toString(i))).actionGet();
    -    Assert.assertEquals("message #" + i, response.getSource().get("data"));
    +   @Test
    +   public void testEmptyTransportClient() throws Exception {
    +           runEmptyTransportClientTest();
        }
     
    -   node.close();
    - }
    -
    -   @Test(expected = JobExecutionException.class)
    +   @Test
        public void testTransportClientFails() throws Exception{
    -           // this checks whether the TransportClient fails early when 
there is no cluster to
    -           // connect to. There isn't a similar test for the Node Client 
version since that
    -           // one will block and wait for a cluster to come online
    -
    -           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    -
    -           DataStreamSource<Tuple2<Integer, String>> source = 
env.addSource(new TestSourceFunction());
    -
    -           Map<String, String> config = new HashMap<>();
    -           // This instructs the sink to emit after every element, 
otherwise they would be buffered
    -           config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, 
"1");
    -           config.put("cluster.name", "my-node-client-cluster");
    -
    -           List<InetSocketAddress> transports = new ArrayList<>();
    -           transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
    -
    -           source.addSink(new ElasticsearchSink<>(config, transports, new 
TestElasticsearchSinkFunction()));
    -
    -           env.execute("Elasticsearch Node Client Test");
    +           runTransportClientFailsTest();
        }
     
    -   private static class TestSourceFunction implements 
SourceFunction<Tuple2<Integer, String>> {
    -           private static final long serialVersionUID = 1L;
    -
    -           private volatile boolean running = true;
    -
    -           @Override
    -           public void run(SourceContext<Tuple2<Integer, String>> ctx) 
throws Exception {
    -                   for (int i = 0; i < NUM_ELEMENTS && running; i++) {
    -                           ctx.collect(Tuple2.of(i, "message #" + i));
    -                   }
    -           }
    -
    -           @Override
    -           public void cancel() {
    -                   running = false;
    -           }
    +   @Override
    +   protected <T> ElasticsearchSinkBase<T> 
createElasticsearchSink(Map<String, String> userConfig,
    +                                                                           
                                                List<InetSocketAddress> 
transportAddresses,
    +                                                                           
                                                ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction) {
    +           return new ElasticsearchSink<>(userConfig, transportAddresses, 
elasticsearchSinkFunction);
    --- End diff --
    
    Whitespace


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to