[
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15821643#comment-15821643
]
ASF GitHub Bot commented on FLINK-4988:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3112#discussion_r95978090
--- Diff:
flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
---
@@ -17,217 +17,51 @@
*/
package org.apache.flink.streaming.connectors.elasticsearch2;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
-import org.junit.Assert;
-import org.junit.ClassRule;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class ElasticsearchSinkITCase extends
StreamingMultipleProgramsTestBase {
-
- private static final int NUM_ELEMENTS = 20;
-
- @ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
@Test
public void testTransportClient() throws Exception {
-
- File dataDir = tempFolder.newFolder();
-
- Node node = NodeBuilder.nodeBuilder()
- .settings(Settings.settingsBuilder()
- .put("path.home",
dataDir.getParent())
- .put("http.enabled", false)
- .put("path.data",
dataDir.getAbsolutePath()))
- // set a custom cluster name to verify that
user config works correctly
- .clusterName("my-transport-client-cluster")
- .node();
-
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source =
env.addSource(new TestSourceFunction());
-
- Map<String, String> config = new HashMap<>();
- // This instructs the sink to emit after every element,
otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,
"1");
- config.put("cluster.name", "my-transport-client-cluster");
-
- // Can't use {@link TransportAddress} as its not Serializable
in Elasticsearch 2.x
- List<InetSocketAddress> transports = new ArrayList<>();
- transports.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
- source.addSink(new ElasticsearchSink<>(config, transports, new
TestElasticsearchSinkFunction()));
-
- env.execute("Elasticsearch TransportClient Test");
-
- // verify the results
- Client client = node.client();
- for (int i = 0; i < NUM_ELEMENTS; i++) {
- GetResponse response = client.get(new
GetRequest("my-index",
- "my-type",
Integer.toString(i))).actionGet();
- Assert.assertEquals("message #" + i,
response.getSource().get("data"));
- }
-
- node.close();
+ runTransportClientTest();
}
- @Test(expected = IllegalArgumentException.class)
- public void testNullTransportClient() throws Exception {
-
- File dataDir = tempFolder.newFolder();
-
- Node node = NodeBuilder.nodeBuilder()
- .settings(Settings.settingsBuilder()
- .put("path.home", dataDir.getParent())
- .put("http.enabled", false)
- .put("path.data", dataDir.getAbsolutePath()))
- // set a custom cluster name to verify that user config works
correctly
- .clusterName("my-transport-client-cluster")
- .node();
-
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new
TestSourceFunction());
-
- Map<String, String> config = new HashMap<>();
- // This instructs the sink to emit after every element, otherwise they
would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- config.put("cluster.name", "my-transport-client-cluster");
-
- source.addSink(new ElasticsearchSink<>(config, null, new
TestElasticsearchSinkFunction()));
-
- env.execute("Elasticsearch TransportClient Test");
-
- // verify the results
- Client client = node.client();
- for (int i = 0; i < NUM_ELEMENTS; i++) {
- GetResponse response = client.get(new GetRequest("my-index",
- "my-type", Integer.toString(i))).actionGet();
- Assert.assertEquals("message #" + i, response.getSource().get("data"));
+ @Test
+ public void testNullTransportClient() throws Exception {
+ runNullTransportClientTest();
}
- node.close();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testEmptyTransportClient() throws Exception {
-
- File dataDir = tempFolder.newFolder();
-
- Node node = NodeBuilder.nodeBuilder()
- .settings(Settings.settingsBuilder()
- .put("path.home", dataDir.getParent())
- .put("http.enabled", false)
- .put("path.data", dataDir.getAbsolutePath()))
- // set a custom cluster name to verify that user config works
correctly
- .clusterName("my-transport-client-cluster")
- .node();
-
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new
TestSourceFunction());
-
- Map<String, String> config = new HashMap<>();
- // This instructs the sink to emit after every element, otherwise they
would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- config.put("cluster.name", "my-transport-client-cluster");
-
- source.addSink(new ElasticsearchSink<>(config, new
ArrayList<InetSocketAddress>(), new TestElasticsearchSinkFunction()));
-
- env.execute("Elasticsearch TransportClient Test");
-
- // verify the results
- Client client = node.client();
- for (int i = 0; i < NUM_ELEMENTS; i++) {
- GetResponse response = client.get(new GetRequest("my-index",
- "my-type", Integer.toString(i))).actionGet();
- Assert.assertEquals("message #" + i, response.getSource().get("data"));
+ @Test
+ public void testEmptyTransportClient() throws Exception {
+ runEmptyTransportClientTest();
}
- node.close();
- }
-
- @Test(expected = JobExecutionException.class)
+ @Test
public void testTransportClientFails() throws Exception{
- // this checks whether the TransportClient fails early when
there is no cluster to
- // connect to. There isn't a similar test for the Node Client
version since that
- // one will block and wait for a cluster to come online
-
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source =
env.addSource(new TestSourceFunction());
-
- Map<String, String> config = new HashMap<>();
- // This instructs the sink to emit after every element,
otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,
"1");
- config.put("cluster.name", "my-node-client-cluster");
-
- List<InetSocketAddress> transports = new ArrayList<>();
- transports.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
- source.addSink(new ElasticsearchSink<>(config, transports, new
TestElasticsearchSinkFunction()));
-
- env.execute("Elasticsearch Node Client Test");
+ runTransportClientFailsTest();
}
- private static class TestSourceFunction implements
SourceFunction<Tuple2<Integer, String>> {
- private static final long serialVersionUID = 1L;
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Integer, String>> ctx)
throws Exception {
- for (int i = 0; i < NUM_ELEMENTS && running; i++) {
- ctx.collect(Tuple2.of(i, "message #" + i));
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
+ @Override
+ protected <T> ElasticsearchSinkBase<T>
createElasticsearchSink(Map<String, String> userConfig,
+
List<InetSocketAddress>
transportAddresses,
+
ElasticsearchSinkFunction<T>
elasticsearchSinkFunction) {
+ return new ElasticsearchSink<>(userConfig, transportAddresses,
elasticsearchSinkFunction);
--- End diff --
Same as above comment: there don't seem to be trailing spaces.
If you meant empty lines, there are actually method parameters on the far
right.
> Elasticsearch 5.x support
> -------------------------
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
> Issue Type: New Feature
> Reporter: Mike Dias
>
> Elasticsearch 5.x was released:
> https://www.elastic.co/blog/elasticsearch-5-0-0-released
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)