[FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to end test
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7abfcb2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7abfcb2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7abfcb2 Branch: refs/heads/release-1.5 Commit: a7abfcb278d2ef35c3b730c5d238cf32c6094674 Parents: 7f9e4c0 Author: zhangminglei <[email protected]> Authored: Tue May 22 09:23:13 2018 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue May 22 16:50:26 2018 +0800 ---------------------------------------------------------------------- .../examples/ElasticsearchSinkExample.java | 85 ----------- .../examples/ElasticsearchSinkExample.java | 81 ---------- .../examples/ElasticsearchSinkExample.java | 83 ----------- .../flink-elasticsearch1-test/pom.xml | 117 +++++++++++++++ .../tests/Elasticsearch1SinkExample.java | 93 ++++++++++++ .../flink-elasticsearch2-test/pom.xml | 135 +++++++++++++++++ .../tests/Elasticsearch2SinkExample.java | 92 ++++++++++++ .../flink-elasticsearch5-test/pom.xml | 148 +++++++++++++++++++ .../tests/Elasticsearch5SinkExample.java | 92 ++++++++++++ flink-end-to-end-tests/pom.xml | 3 + .../test-scripts/elasticsearch-common.sh | 62 ++++++++ .../test_streaming_elasticsearch125.sh | 109 ++++++++++++++ 12 files changed, 851 insertions(+), 249 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java deleted file mode 100644 index 8a0321d..0000000 --- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.elasticsearch.examples; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink; -import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; -import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; - -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; -import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.transport.TransportAddress; - -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that - * you have a cluster named "elasticsearch" running or change the cluster name in the config map. - */ -@SuppressWarnings("serial") -public class ElasticsearchSinkExample { - - public static void main(String[] args) throws Exception { - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() { - @Override - public String map(Long value) throws Exception { - return "message #" + value; - } - }); - - Map<String, String> userConfig = new HashMap<>(); - userConfig.put("cluster.name", "elasticsearch"); - // This instructs the sink to emit after every element, otherwise they would be buffered - userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - - List<TransportAddress> transports = new ArrayList<>(); - transports.add(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() { - @Override - public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { - indexer.add(createIndexRequest(element)); - } - })); - - env.execute("Elasticsearch Sink Example"); - } - - private static IndexRequest createIndexRequest(String element) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .id(element) - .source(json); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java deleted file mode 100644 index c963927..0000000 --- a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchSinkExample.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.elasticsearch2.examples; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink; - -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that - * you have a cluster named "elasticsearch" running or change the name of cluster in the config map. - */ -public class ElasticsearchSinkExample { - - public static void main(String[] args) throws Exception { - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() { - @Override - public String map(Long value) throws Exception { - return "message #" + value; - } - }); - - Map<String, String> userConfig = new HashMap<>(); - userConfig.put("cluster.name", "elasticsearch"); - // This instructs the sink to emit after every element, otherwise they would be buffered - userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - - List<InetSocketAddress> transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(userConfig, transports, new org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<String>(){ - @Override - public void process(String element, RuntimeContext ctx, org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) { - indexer.add(createIndexRequest(element)); - } - })); - - env.execute("Elasticsearch Sink Example"); - } - - private static IndexRequest createIndexRequest(String element) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .id(element) - .source(json); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java deleted file mode 100644 index 22c1053..0000000 --- a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/examples/ElasticsearchSinkExample.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.elasticsearch5.examples; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; -import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; -import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink; - -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that - * you have a cluster named "elasticsearch" running or change the name of cluster in the config map. - */ -public class ElasticsearchSinkExample { - - public static void main(String[] args) throws Exception { - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() { - @Override - public String map(Long value) throws Exception { - return "message #" + value; - } - }); - - Map<String, String> userConfig = new HashMap<>(); - userConfig.put("cluster.name", "elasticsearch"); - // This instructs the sink to emit after every element, otherwise they would be buffered - userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - - List<InetSocketAddress> transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() { - @Override - public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { - indexer.add(createIndexRequest(element)); - } - })); - - env.execute("Elasticsearch Sink Example"); - } - - private static IndexRequest createIndexRequest(String element) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .id(element) - .source(json); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml new file mode 100644 index 0000000..1960f05 --- /dev/null +++ b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml @@ -0,0 +1,117 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.5-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-elasticsearch1-test_${scala.binary.version}</artifactId> + <name>flink-elasticsearch1-test</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.0.0</version> + <executions> + <!-- Elasticsearch1Sink end to end example --> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <minimizeJar>true</minimizeJar> + <artifactSet> + <excludes> + <exclude>com.google.code.findbugs:jsr305</exclude> + <exclude>org.slf4j:*</exclude> + <exclude>log4j:*</exclude> + </excludes> + </artifactSet> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.streaming.tests.Elasticsearch1SinkExample</mainClass> + </transformer> + </transformers> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + + <!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch1.sh scripts--> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.7</version> + <executions> + <execution> + <id>rename</id> + <phase>package</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target> + <copy file="${project.basedir}/target/flink-elasticsearch1-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch1SinkExample.jar" /> + </target> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java new file mode 100644 index 0000000..bfdb806 --- /dev/null +++ b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * End to end test for Elasticsearch1Sink. + */ +public class Elasticsearch1SinkExample { + public static void main(String[] args) throws Exception { + + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + + if (parameterTool.getNumberOfParameters() < 2) { + System.out.println("Missing parameters!\n" + + "Usage: --index <index> --type <type>"); + return; + } + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + env.enableCheckpointing(5000); + + DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() { + @Override + public String map(Long value) throws Exception { + return "message # " + value; + } + }); + + Map<String, String> userConfig = new HashMap<>(); + userConfig.put("cluster.name", "elasticsearch"); + // This instructs the sink to emit after every element, otherwise they would be buffered + userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + List<TransportAddress> transports = new ArrayList<>(); + transports.add(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); + + source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() { + @Override + public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { + indexer.add(createIndexRequest(element, parameterTool)); + } + })); + + env.execute("Elasticsearch1.x end to end sink test example"); + } + + private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) { + Map<String, Object> json = new HashMap<>(); + json.put("data", element); + + return Requests.indexRequest() + .index(parameterTool.getRequired("index")) + .type(parameterTool.getRequired("type")) + .id(element) + .source(json); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml new file mode 100644 index 0000000..4fd93de --- /dev/null +++ b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml @@ -0,0 +1,135 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.5-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-elasticsearch2-test_${scala.binary.version}</artifactId> + <name>flink-elasticsearch2-test</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <!-- Remove elasticsearch1.7.1 --> + <exclusions> + <exclusion> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>2.3.5</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.0.0</version> + <executions> + <!-- Elasticsearch2Sink end to end example --> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <minimizeJar>true</minimizeJar> + <artifactSet> + <excludes> + <exclude>com.google.code.findbugs:jsr305</exclude> + <exclude>org.slf4j:*</exclude> + <exclude>log4j:*</exclude> + </excludes> + </artifactSet> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.streaming.tests.Elasticsearch2SinkExample</mainClass> + </transformer> + </transformers> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + + <!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch2.sh scripts--> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.7</version> + <executions> + <execution> + <id>rename</id> + <phase>package</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target> + <copy file="${project.basedir}/target/flink-elasticsearch2-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch2SinkExample.jar" /> + </target> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java new file mode 100644 index 0000000..4ec03aa --- /dev/null +++ b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * End to end test for Elasticsearch2Sink. + */ +public class Elasticsearch2SinkExample { + + public static void main(String[] args) throws Exception { + + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + + if (parameterTool.getNumberOfParameters() < 2) { + System.out.println("Missing parameters!\n" + + "Usage: --index <index> --type <type>"); + return; + } + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + env.enableCheckpointing(5000); + + DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() { + @Override + public String map(Long value) throws Exception { + return "message #" + value; + } + }); + + Map<String, String> userConfig = new HashMap<>(); + userConfig.put("cluster.name", "elasticsearch"); + // This instructs the sink to emit after every element, otherwise they would be buffered + userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + List<InetSocketAddress> transports = new ArrayList<>(); + transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); + + source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>(){ + @Override + public void process(String element, RuntimeContext ctx, org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) { + indexer.add(createIndexRequest(element, parameterTool)); + } + })); + + env.execute("Elasticsearch2.x end to end sink test example"); + } + + private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) { + Map<String, Object> json = new HashMap<>(); + json.put("data", element); + + return Requests.indexRequest() + .index(parameterTool.getRequired("index")) + .type(parameterTool.getRequired("type")) + .id(element) + .source(json); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml new file mode 100644 index 0000000..3a1e734 --- /dev/null +++ b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml @@ -0,0 +1,148 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.5-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-elasticsearch5-test_${scala.binary.version}</artifactId> + <name>flink-elasticsearch5-test</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <exclusions> + <!-- Remove elasticsearch1.7.1 --> + <exclusion> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- Dependency for Elasticsearch 5.x Java Client --> + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>transport</artifactId> + <version>5.1.2</version> + </dependency> + + <!-- + Elasticsearch 5.x uses Log4j2 and no longer detects logging implementations, making + Log4j2 a strict dependency. The following is added so that the Log4j2 API in + Elasticsearch 5.x is routed to SLF4J. This way, user projects can remain flexible + in the logging implementation preferred. + --> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-to-slf4j</artifactId> + <version>2.7</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.0.0</version> + <executions> + <!-- Elasticsearch5Sink end to end example --> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <minimizeJar>true</minimizeJar> + <artifactSet> + <excludes> + <exclude>com.google.code.findbugs:jsr305</exclude> + <exclude>org.slf4j:*</exclude> + <exclude>log4j:*</exclude> + </excludes> + </artifactSet> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.streaming.tests.Elasticsearch5SinkExample</mainClass> + </transformer> + </transformers> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + + <!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch5.sh scripts--> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.7</version> + <executions> + <execution> + <id>rename</id> + <phase>package</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target> + <copy file="${project.basedir}/target/flink-elasticsearch5-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch5SinkExample.jar" /> + </target> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java new file mode 100644 index 0000000..285f902 --- /dev/null +++ b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * End to end test for Elasticsearch5Sink. + */ +public class Elasticsearch5SinkExample { + public static void main(String[] args) throws Exception { + + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + + if (parameterTool.getNumberOfParameters() < 2) { + System.out.println("Missing parameters!\n" + + "Usage: --index <index> --type <type>"); + return; + } + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + env.enableCheckpointing(5000); + + DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() { + @Override + public String map(Long value) throws Exception { + return "message #" + value; + } + }); + + Map<String, String> userConfig = new HashMap<>(); + userConfig.put("cluster.name", "elasticsearch"); + // This instructs the sink to emit after every element, otherwise they would be buffered + userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + List<InetSocketAddress> transports = new ArrayList<>(); + transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); + + source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() { + @Override + public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { + indexer.add(createIndexRequest(element, parameterTool)); + } + })); + + env.execute("Elasticsearch5.x end to end sink test example"); + } + + private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) { + Map<String, Object> json = new HashMap<>(); + json.put("data", element); + + return Requests.indexRequest() + .index(parameterTool.getRequired("index")) + .type(parameterTool.getRequired("type")) + .id(element) + .source(json); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 45b63f0..04b8532 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -43,6 +43,9 @@ under the License. <module>flink-high-parallelism-iterations-test</module> <module>flink-stream-stateful-job-upgrade-test</module> <module>flink-local-recovery-and-allocation-test</module> + <module>flink-elasticsearch1-test</module> + <module>flink-elasticsearch2-test</module> + <module>flink-elasticsearch5-test</module> </modules> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh new file mode 100644 index 0000000..3fda344 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh @@ -0,0 +1,62 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +set -o pipefail + +if [[ -z $TEST_DATA_DIR ]]; then + echo "Must run common.sh before kafka-common.sh." + exit 1 +fi + +function verify_elasticsearch_process_exist { + ELASTICSEARCH_PROCESS=$(jps | grep Elasticsearch | awk '{print $2}') + + # make sure the elasticsearch node is actually running + if [ "$ELASTICSEARCH_PROCESS" != "Elasticsearch" ]; then + echo "Elasticsearch node is not running." + PASS="" + exit 1 + else + echo "Elasticsearch node is running." + fi +} + +function verify_result { + if [ -f "$TEST_DATA_DIR/output" ]; then + rm $TEST_DATA_DIR/output + fi + + curl 'localhost:9200/index/_search?q=*&pretty&size=21' > $TEST_DATA_DIR/output + + if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then + echo "Elasticsearch end to end test pass." + else + echo "Elasticsearch end to end test failed." + PASS="" + exit 1 + fi +} + +function shutdown_elasticsearch_cluster { + pid=$(jps | grep Elasticsearch | awk '{print $1}') + kill -SIGTERM $pid + + # make sure to run regular cleanup as well + cleanup +} http://git-wip-us.apache.org/repos/asf/flink/blob/a7abfcb2/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh new file mode 100755 index 0000000..dea3f13 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh @@ -0,0 +1,109 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common.sh +source "$(dirname "$0")"/elasticsearch-common.sh + +mkdir -p $TEST_DATA_DIR + +ELASTICSEARCH1_URL="https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz" +ELASTICSEARCH2_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz" +ELASTICSEARCH5_URL="https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz" + +# start downloading elasticsearch1 +echo "Downloading Elasticsearch1 from $ELASTICSEARCH1_URL" +curl "$ELASTICSEARCH1_URL" > $TEST_DATA_DIR/elasticsearch1.tar.gz + +tar xzf $TEST_DATA_DIR/elasticsearch1.tar.gz -C $TEST_DATA_DIR/ +ELASTICSEARCH1_DIR=$TEST_DATA_DIR/elasticsearch-1.7.1 + +# start elasticsearch1 cluster +$ELASTICSEARCH1_DIR/bin/elasticsearch -daemon + +verify_elasticsearch_process_exist + +start_cluster + +TEST_ES1_JAR=$TEST_DATA_DIR/../../flink-elasticsearch1-test/target/Elasticsearch1SinkExample.jar + +# run the Flink job +$FLINK_DIR/bin/flink run -p 1 $TEST_ES1_JAR \ + --index index \ + --type type + +verify_result + +shutdown_elasticsearch_cluster + +mkdir -p $TEST_DATA_DIR + +# start downloading elasticsearch2 +echo "Downloading Elasticsearch2 from $ELASTICSEARCH2_URL" +curl "$ELASTICSEARCH2_URL" > $TEST_DATA_DIR/elasticsearch2.tar.gz + +tar xzf $TEST_DATA_DIR/elasticsearch2.tar.gz -C $TEST_DATA_DIR/ +ELASTICSEARCH2_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5 + +# start elasticsearch cluster, different from elasticsearch1 since using -daemon here will hang the shell. +nohup $ELASTICSEARCH2_DIR/bin/elasticsearch & + +verify_elasticsearch_process_exist + +start_cluster + +TEST_ES2_JAR=$TEST_DATA_DIR/../../flink-elasticsearch2-test/target/Elasticsearch2SinkExample.jar + +# run the Flink job +$FLINK_DIR/bin/flink run -p 1 $TEST_ES2_JAR \ + --index index \ + --type type + +verify_result + +shutdown_elasticsearch_cluster + +mkdir -p $TEST_DATA_DIR + +# start downloading elasticsearch5 +echo "Downloading Elasticsearch5 from $ELASTICSEARCH5_URL" +curl "$ELASTICSEARCH5_URL" > $TEST_DATA_DIR/elasticsearch5.tar.gz + +tar xzf $TEST_DATA_DIR/elasticsearch5.tar.gz -C $TEST_DATA_DIR/ +ELASTICSEARCH5_DIR=$TEST_DATA_DIR/elasticsearch-5.1.2 + +# start elasticsearch cluster, different from elasticsearch1 since using -daemon here will hang the shell. +nohup $ELASTICSEARCH5_DIR/bin/elasticsearch & + +verify_elasticsearch_process_exist + +start_cluster + +TEST_ES5_JAR=$TEST_DATA_DIR/../../flink-elasticsearch5-test/target/Elasticsearch5SinkExample.jar + +# run the Flink job +$FLINK_DIR/bin/flink run -p 1 $TEST_ES5_JAR \ + --index index \ + --type type + +verify_result + +rm -rf $FLINK_DIR/log/* 2> /dev/null + +trap shutdown_elasticsearch_cluster INT +trap shutdown_elasticsearch_cluster EXIT
