This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7f887a96626c94ce21db4b27bb70570bc225bcab Author: Alexander Fedulov <[email protected]> AuthorDate: Wed Apr 20 12:16:35 2022 +0200 [FLINK-27066][tests][connectors/elasticsearch] Remove bash-based E2E Elasticsearch tests --- .../flink-elasticsearch6-test/pom.xml | 91 ------------------ .../streaming/tests/Elasticsearch6SinkExample.java | 106 --------------------- .../flink-elasticsearch7-test/pom.xml | 91 ------------------ .../streaming/tests/Elasticsearch7SinkExample.java | 101 -------------------- flink-end-to-end-tests/pom.xml | 2 - flink-end-to-end-tests/run-nightly-tests.sh | 3 - .../test-scripts/test_streaming_elasticsearch.sh | 48 ---------- 7 files changed, 442 deletions(-) diff --git a/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml deleted file mode 100644 index 0d6da4e6dcb..00000000000 --- a/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml +++ /dev/null @@ -1,91 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-end-to-end-tests</artifactId> - <version>1.16-SNAPSHOT</version> - </parent> - - <artifactId>flink-elasticsearch6-test</artifactId> - <name>Flink : E2E Tests : Elasticsearch 6</name> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-elasticsearch6</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <id>Elasticsearch6SinkExample</id> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <finalName>Elasticsearch6SinkExample</finalName> - <artifactSet> - <excludes> - <exclude>com.google.code.findbugs:jsr305</exclude> - </excludes> - </artifactSet> - <filters> - <filter> - <artifact>*:*</artifact> - <excludes> - <exclude>META-INF/*.SF</exclude> - <exclude>META-INF/*.DSA</exclude> - <exclude>META-INF/*.RSA</exclude> - </excludes> - </filter> - </filters> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <mainClass>org.apache.flink.streaming.tests.Elasticsearch6SinkExample</mainClass> - </transformer> - </transformers> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - -</project> diff --git a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java deleted file mode 100644 index 72433514312..00000000000 --- a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.tests; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder; -import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.util.Collector; - -import org.apache.http.HttpHost; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.Requests; - -import java.util.HashMap; -import java.util.Map; - -/** End to end test for Elasticsearch6Sink. */ -public class Elasticsearch6SinkExample { - - public static void main(String[] args) throws Exception { - - final ParameterTool parameterTool = ParameterTool.fromArgs(args); - - if (parameterTool.getNumberOfParameters() < 3) { - System.out.println( - "Missing parameters!\n" - + "Usage: --numRecords <numRecords> --index <index> --type <type>"); - return; - } - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(5000); - - DataStream<Tuple2<String, String>> source = - env.fromSequence(0, parameterTool.getInt("numRecords") - 1) - .flatMap( - new FlatMapFunction<Long, Tuple2<String, String>>() { - @Override - public void flatMap( - Long value, Collector<Tuple2<String, String>> out) { - final String key = String.valueOf(value); - final String message = "message #" + value; - out.collect(Tuple2.of(key, message + "update #1")); - out.collect(Tuple2.of(key, message + "update #2")); - } - }); - - ElasticsearchSink<Tuple2<String, String>> sink = - new Elasticsearch6SinkBuilder<Tuple2<String, String>>() - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter( - (element, ctx, indexer) -> { - indexer.add(createIndexRequest(element.f1, parameterTool)); - indexer.add(createUpdateRequest(element, parameterTool)); - }) - .setBulkFlushMaxActions(1) // emit after every element, don't buffer - .build(); - - source.sinkTo(sink); - env.execute("Elasticsearch 6.x end to end sink test example"); - } - - private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index(parameterTool.getRequired("index")) - .type(parameterTool.getRequired("type")) - .id(element) - .source(json); - } - - private static UpdateRequest createUpdateRequest( - Tuple2<String, String> element, ParameterTool parameterTool) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element.f1); - - return new UpdateRequest( - parameterTool.getRequired("index"), - parameterTool.getRequired("type"), - element.f0) - .doc(json) - .upsert(json); - } -} diff --git a/flink-end-to-end-tests/flink-elasticsearch7-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch7-test/pom.xml deleted file mode 100644 index cebe36bdf14..00000000000 --- a/flink-end-to-end-tests/flink-elasticsearch7-test/pom.xml +++ /dev/null @@ -1,91 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-end-to-end-tests</artifactId> - <version>1.16-SNAPSHOT</version> - </parent> - - <artifactId>flink-elasticsearch7-test</artifactId> - <name>Flink : E2E Tests : Elasticsearch 7</name> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-elasticsearch7</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <id>Elasticsearch7SinkExample</id> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <finalName>Elasticsearch7SinkExample</finalName> - <artifactSet> - <excludes> - <exclude>com.google.code.findbugs:jsr305</exclude> - </excludes> - </artifactSet> - <filters> - <filter> - <artifact>*:*</artifact> - <excludes> - <exclude>META-INF/*.SF</exclude> - <exclude>META-INF/*.DSA</exclude> - <exclude>META-INF/*.RSA</exclude> - </excludes> - </filter> - </filters> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <mainClass>org.apache.flink.streaming.tests.Elasticsearch7SinkExample</mainClass> - </transformer> - </transformers> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - -</project> diff --git a/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java deleted file mode 100644 index 10a4bc785ac..00000000000 --- a/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.tests; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder; -import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.util.Collector; - -import org.apache.http.HttpHost; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.Requests; - -import java.util.HashMap; -import java.util.Map; - -/** End to end test for Elasticsearch7Sink. */ -public class Elasticsearch7SinkExample { - - public static void main(String[] args) throws Exception { - - final ParameterTool parameterTool = ParameterTool.fromArgs(args); - - if (parameterTool.getNumberOfParameters() < 2) { - System.out.println( - "Missing parameters!\n" + "Usage: --numRecords <numRecords> --index <index>"); - return; - } - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(5000); - - DataStream<Tuple2<String, String>> source = - env.fromSequence(0, parameterTool.getInt("numRecords") - 1) - .flatMap( - new FlatMapFunction<Long, Tuple2<String, String>>() { - @Override - public void flatMap( - Long value, Collector<Tuple2<String, String>> out) { - final String key = String.valueOf(value); - final String message = "message #" + value; - out.collect(Tuple2.of(key, message + "update #1")); - out.collect(Tuple2.of(key, message + "update #2")); - } - }); - - ElasticsearchSink<Tuple2<String, String>> sink = - new Elasticsearch7SinkBuilder<Tuple2<String, String>>() - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter( - (element, ctx, indexer) -> { - indexer.add(createIndexRequest(element.f1, parameterTool)); - indexer.add(createUpdateRequest(element, parameterTool)); - }) - .setBulkFlushMaxActions(1) // emit after every element, don't buffer - .build(); - - source.sinkTo(sink); - env.execute("Elasticsearch 7.x end to end sink test example"); - } - - private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index(parameterTool.getRequired("index")) - .id(element) - .source(json); - } - - private static UpdateRequest createUpdateRequest( - Tuple2<String, String> element, ParameterTool parameterTool) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element.f1); - - return new UpdateRequest(parameterTool.getRequired("index"), element.f0) - .doc(json) - .upsert(json); - } -} diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 7a1b3ba121f..d2ca9322d5a 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -51,7 +51,6 @@ under the License. <module>flink-stream-stateful-job-upgrade-test</module> <module>flink-queryable-state-test</module> <module>flink-local-recovery-and-allocation-test</module> - <module>flink-elasticsearch6-test</module> <module>flink-quickstart-test</module> <module>flink-confluent-schema-registry</module> <module>flink-stream-state-ttl-test</module> @@ -69,7 +68,6 @@ under the License. <module>flink-plugins-test</module> <module>flink-tpch-test</module> <module>flink-streaming-kinesis-test</module> - <module>flink-elasticsearch7-test</module> <module>flink-end-to-end-tests-common-kafka</module> <module>flink-tpcds-test</module> <module>flink-netty-shuffle-memory-control-test</module> diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 0a8b044800d..a9a78c92aef 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -199,9 +199,6 @@ function run_group_2 { run_test "Netty shuffle direct memory consumption end-to-end test" "$END_TO_END_DIR/test-scripts/test_netty_shuffle_memory_control.sh" - run_test "Elasticsearch (v6.8.20) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.8.20.tar.gz" - run_test "Elasticsearch (v7.15.2) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 7 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.15.2-linux-x86_64.tar.gz" - run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java" run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala" diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh deleted file mode 100755 index e2ee273d43a..00000000000 --- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env bash -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -source "$(dirname "$0")"/common.sh -source "$(dirname "$0")"/elasticsearch-common.sh - -ELASTICSEARCH_VERSION=$1 -DOWNLOAD_URL=$2 - -mkdir -p $TEST_DATA_DIR - -setup_elasticsearch $DOWNLOAD_URL $ELASTICSEARCH_VERSION -wait_elasticsearch_working - -start_cluster - -function test_cleanup { - shutdown_elasticsearch_cluster index -} - -on_exit test_cleanup - -TEST_ES_JAR=${END_TO_END_DIR}/flink-elasticsearch${ELASTICSEARCH_VERSION}-test/target/Elasticsearch${ELASTICSEARCH_VERSION}SinkExample.jar - -# run the Flink job -$FLINK_DIR/bin/flink run -p 1 $TEST_ES_JAR \ - --numRecords 20 \ - --index index \ - --type type - -# 40 index requests and 20 final update requests -verify_result_line_number 60 index
