Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/3112#discussion_r95974130
--- Diff:
flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
---
@@ -17,217 +17,51 @@
*/
package org.apache.flink.streaming.connectors.elasticsearch2;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
-import org.junit.Assert;
-import org.junit.ClassRule;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class ElasticsearchSinkITCase extends
StreamingMultipleProgramsTestBase {
-
- private static final int NUM_ELEMENTS = 20;
-
- @ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
@Test
public void testTransportClient() throws Exception {
-
- File dataDir = tempFolder.newFolder();
-
- Node node = NodeBuilder.nodeBuilder()
- .settings(Settings.settingsBuilder()
- .put("path.home",
dataDir.getParent())
- .put("http.enabled", false)
- .put("path.data",
dataDir.getAbsolutePath()))
- // set a custom cluster name to verify that
user config works correctly
- .clusterName("my-transport-client-cluster")
- .node();
-
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source =
env.addSource(new TestSourceFunction());
-
- Map<String, String> config = new HashMap<>();
- // This instructs the sink to emit after every element,
otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,
"1");
- config.put("cluster.name", "my-transport-client-cluster");
-
- // Can't use {@link TransportAddress} as its not Serializable
in Elasticsearch 2.x
- List<InetSocketAddress> transports = new ArrayList<>();
- transports.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
- source.addSink(new ElasticsearchSink<>(config, transports, new
TestElasticsearchSinkFunction()));
-
- env.execute("Elasticsearch TransportClient Test");
-
- // verify the results
- Client client = node.client();
- for (int i = 0; i < NUM_ELEMENTS; i++) {
- GetResponse response = client.get(new
GetRequest("my-index",
- "my-type",
Integer.toString(i))).actionGet();
- Assert.assertEquals("message #" + i,
response.getSource().get("data"));
- }
-
- node.close();
+ runTransportClientTest();
}
- @Test(expected = IllegalArgumentException.class)
- public void testNullTransportClient() throws Exception {
-
- File dataDir = tempFolder.newFolder();
-
- Node node = NodeBuilder.nodeBuilder()
- .settings(Settings.settingsBuilder()
- .put("path.home", dataDir.getParent())
- .put("http.enabled", false)
- .put("path.data", dataDir.getAbsolutePath()))
- // set a custom cluster name to verify that user config works
correctly
- .clusterName("my-transport-client-cluster")
- .node();
-
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new
TestSourceFunction());
-
- Map<String, String> config = new HashMap<>();
- // This instructs the sink to emit after every element, otherwise they
would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- config.put("cluster.name", "my-transport-client-cluster");
-
- source.addSink(new ElasticsearchSink<>(config, null, new
TestElasticsearchSinkFunction()));
-
- env.execute("Elasticsearch TransportClient Test");
-
- // verify the results
- Client client = node.client();
- for (int i = 0; i < NUM_ELEMENTS; i++) {
- GetResponse response = client.get(new GetRequest("my-index",
- "my-type", Integer.toString(i))).actionGet();
- Assert.assertEquals("message #" + i, response.getSource().get("data"));
+ @Test
+ public void testNullTransportClient() throws Exception {
+ runNullTransportClientTest();
}
- node.close();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testEmptyTransportClient() throws Exception {
-
- File dataDir = tempFolder.newFolder();
-
- Node node = NodeBuilder.nodeBuilder()
- .settings(Settings.settingsBuilder()
- .put("path.home", dataDir.getParent())
- .put("http.enabled", false)
- .put("path.data", dataDir.getAbsolutePath()))
- // set a custom cluster name to verify that user config works
correctly
- .clusterName("my-transport-client-cluster")
- .node();
-
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new
TestSourceFunction());
-
- Map<String, String> config = new HashMap<>();
- // This instructs the sink to emit after every element, otherwise they
would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- config.put("cluster.name", "my-transport-client-cluster");
-
- source.addSink(new ElasticsearchSink<>(config, new
ArrayList<InetSocketAddress>(), new TestElasticsearchSinkFunction()));
-
- env.execute("Elasticsearch TransportClient Test");
-
- // verify the results
- Client client = node.client();
- for (int i = 0; i < NUM_ELEMENTS; i++) {
- GetResponse response = client.get(new GetRequest("my-index",
- "my-type", Integer.toString(i))).actionGet();
- Assert.assertEquals("message #" + i, response.getSource().get("data"));
+ @Test
+ public void testEmptyTransportClient() throws Exception {
+ runEmptyTransportClientTest();
}
- node.close();
- }
-
- @Test(expected = JobExecutionException.class)
+ @Test
public void testTransportClientFails() throws Exception{
- // this checks whether the TransportClient fails early when
there is no cluster to
- // connect to. There isn't a similar test for the Node Client
version since that
- // one will block and wait for a cluster to come online
-
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source =
env.addSource(new TestSourceFunction());
-
- Map<String, String> config = new HashMap<>();
- // This instructs the sink to emit after every element,
otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,
"1");
- config.put("cluster.name", "my-node-client-cluster");
-
- List<InetSocketAddress> transports = new ArrayList<>();
- transports.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
- source.addSink(new ElasticsearchSink<>(config, transports, new
TestElasticsearchSinkFunction()));
-
- env.execute("Elasticsearch Node Client Test");
+ runTransportClientFailsTest();
}
- private static class TestSourceFunction implements
SourceFunction<Tuple2<Integer, String>> {
- private static final long serialVersionUID = 1L;
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Integer, String>> ctx)
throws Exception {
- for (int i = 0; i < NUM_ELEMENTS && running; i++) {
- ctx.collect(Tuple2.of(i, "message #" + i));
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
+ @Override
+ protected <T> ElasticsearchSinkBase<T>
createElasticsearchSink(Map<String, String> userConfig,
+
List<InetSocketAddress>
transportAddresses,
+
ElasticsearchSinkFunction<T>
elasticsearchSinkFunction) {
+ return new ElasticsearchSink<>(userConfig, transportAddresses,
elasticsearchSinkFunction);
--- End diff --
Whitespace
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---