[FLINK-9008] [e2e] Reuse flink-elasticsearch5-test job code as quickstart e2e 
test's modified job

Previously, the modified job used in the `test_quickstarts.sh` test
script is maintained as a new Maven module. This is an overkill, since all
we are doing is replacing the quickstart's contained job with something
more complex and with additional dependencies.

This commit changes this by simply reusing job code in
flink-elasticsearch5-test as the modified job,
which is copied to the quickstart project.

This closes #5823.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50931228
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50931228
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50931228

Branch: refs/heads/master
Commit: 5093122804f481021f3b33d9d5b4da2144c8cc61
Parents: 393eb67
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Fri May 18 18:45:57 2018 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue May 22 16:40:30 2018 +0800

----------------------------------------------------------------------
 .../flink-quickstart-test/pom.xml               | 49 -----------
 .../quickstart/ElasticsearchStreamingJob.java   | 89 --------------------
 flink-end-to-end-tests/pom.xml                  |  1 -
 .../test-scripts/test_quickstarts.sh            | 69 ++++++---------
 4 files changed, 24 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/50931228/flink-end-to-end-tests/flink-quickstart-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-quickstart-test/pom.xml 
b/flink-end-to-end-tests/flink-quickstart-test/pom.xml
deleted file mode 100644
index 59f5416..0000000
--- a/flink-end-to-end-tests/flink-quickstart-test/pom.xml
+++ /dev/null
@@ -1,49 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-    -->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-       <parent>
-               <artifactId>flink-end-to-end-tests</artifactId>
-               <groupId>org.apache.flink</groupId>
-               <version>1.6-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <artifactId>flink-quickstart-test</artifactId>
-       <name>flink-quickstart-test</name>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-       </dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/50931228/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
 
b/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
deleted file mode 100644
index e20e045..0000000
--- 
a/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.quickstart;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
-
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Elasticsearch example for Flink Streaming Job.
- *
- * <p>In this streaming job, we generate a bunch of data from numbers, apply 
operator map
- * made a type conversion. Then we choose elasticsearch as its sink to storage 
these data.
- *
- * <p>Run test_quickstarts.sh to verify this program. Package this class to a 
jar, verify the jar,
- * then deploy it on a flink cluster.
- */
-public class ElasticsearchStreamingJob {
-
-       public static void main(String[] args) throws Exception {
-               // set up the streaming execution environment
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               DataStream<String> source = env.generateSequence(0, 20)
-                       .map(new MapFunction<Long, String>() {
-                               @Override
-                               public String map(Long value) throws Exception {
-                                       return value.toString();
-                               }});
-
-               Map<String, String> userConfig = new HashMap<>();
-               userConfig.put("cluster.name", "elasticsearch");
-               // This instructs the sink to emit after every element, 
otherwise they would be buffered
-               
userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
-               List<InetSocketAddress> transports = new ArrayList<>();
-               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-
-               source.addSink(new ElasticsearchSink<>(userConfig, transports, 
new ElasticsearchSinkFunction<String>(){
-                       @Override
-                       public void process(String element, RuntimeContext ctx, 
org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
-                               indexer.add(createIndexRequest(element));
-                       }
-               }));
-
-               // execute program
-               env.execute("Flink Streaming Job of writing data to 
elasticsearch");
-       }
-
-       private static IndexRequest createIndexRequest(String element) {
-               Map<String, Object> json = new HashMap<>();
-               json.put("data", element);
-
-               return Requests.indexRequest()
-                       .index("my-index")
-                       .type("my-type")
-                       .id(element)
-                       .source(json);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/50931228/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 8fb7eb8..581abc8 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -47,7 +47,6 @@ under the License.
                <module>flink-elasticsearch1-test</module>
                <module>flink-elasticsearch2-test</module>
                <module>flink-elasticsearch5-test</module>
-               <module>flink-quickstart-test</module>
        </modules>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/50931228/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_quickstarts.sh 
b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
index 8f7aaab..3f63226 100755
--- a/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
+++ b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
@@ -20,6 +20,7 @@
 # End to end test for quick starts test.
 
 source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/elasticsearch-common.sh
 
 mkdir -p $TEST_DATA_DIR
 
@@ -37,18 +38,20 @@ mvn archetype:generate                             \
 
 cd flink-java-project
 
-cp 
$TEST_DATA_DIR/../../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
 $TEST_DATA_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+# use the Flink Elasticsearch sink example job code in 
flink-elasticsearch5-tests to simulate modifications to contained job
+cp 
$TEST_INFRA_DIR/../flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
 $TEST_DATA_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+sed -i -e 's/package org.apache.flink.streaming.tests;/package 
org.apache.flink.quickstart;/' 
$TEST_DATA_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/Elasticsearch5SinkExample.java
 
 position=$(awk '/<dependencies>/ {print NR}' pom.xml | head -1)
 
 sed -i -e ''"$(($position + 1))"'i\
 <dependency>\
 <groupId>org.apache.flink</groupId>\
-<artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId>\
+<artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>\
 <version>${flink.version}</version>\
 </dependency>' pom.xml
 
-sed -i -e 
"s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/"
 pom.xml
+sed -i -e 
"s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.streaming.tests.Elasticsearch5SinkExample/"
 pom.xml
 
 mvn clean package -nsu
 
@@ -69,59 +72,35 @@ else
 fi
 
 if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" 
contentsInJar.txt` -eq '0' && \
-      `grep -c "org/apache/flink/quickstart/ElasticsearchStreamingJob.class" 
contentsInJar.txt` -eq '0' && \
-      `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" 
contentsInJar.txt` -eq '0' ]]; then
+      `grep -c "org/apache/flink/quickstart/Elasticsearch5SinkExample.class" 
contentsInJar.txt` -eq '0' && \
+      `grep -c "org/apache/flink/streaming/connectors/elasticsearch5" 
contentsInJar.txt` -eq '0' ]]; then
 
-    echo "Failure: Since ElasticsearchStreamingJob.class and other user 
classes are not included in the jar. "
+    echo "Failure: Since Elasticsearch5SinkExample.class and other user 
classes are not included in the jar. "
     PASS=""
     exit 1
 else
-    echo "Success: ElasticsearchStreamingJob.class and other user classes are 
included in the jar."
+    echo "Success: Elasticsearch5SinkExample.class and other user classes are 
included in the jar."
 fi
 
-ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
-
-curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz
-tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/
-ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
-
-nohup $ELASTICSEARCH_DIR/bin/elasticsearch &
-
-ELASTICSEARCH_PROCESS=$(jps | grep Elasticsearch | awk '{print $2}')
-
-# make sure the elasticsearch node is actually running
-if [ "$ELASTICSEARCH_PROCESS" != "Elasticsearch" ]; then
-  echo "Elasticsearch node is not running."
-  PASS=""
-  exit 1
-else
-  echo "Elasticsearch node is running."
-fi
-
-TEST_PROGRAM_JAR=$TEST_DATA_DIR/flink-java-project/target/flink-java-project-0.1.jar
-
-start_cluster
-
-# run the Flink job
-$FLINK_DIR/bin/flink run -c 
org.apache.flink.quickstart.ElasticsearchStreamingJob $TEST_PROGRAM_JAR
-
-curl 'localhost:9200/my-index/_search?q=*&pretty&size=21' > 
$TEST_DATA_DIR/output
-
-if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then
-    echo "Quickstarts end to end test pass."
-else
-    echo "Quickstarts end to end test failed."
-    PASS=""
-    exit 1
-fi
+setup_elasticsearch 
"https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz";
+verify_elasticsearch_process_exist
 
 function shutdownAndCleanup {
-    pid=$(jps | grep Elasticsearch | awk '{print $1}')
-    kill -SIGTERM $pid
+    shutdown_elasticsearch_cluster
 
     # make sure to run regular cleanup as well
     cleanup
 }
-
 trap shutdownAndCleanup INT
 trap shutdownAndCleanup EXIT
+
+TEST_PROGRAM_JAR=$TEST_DATA_DIR/flink-java-project/target/flink-java-project-0.1.jar
+
+start_cluster
+
+$FLINK_DIR/bin/flink run -c 
org.apache.flink.quickstart.Elasticsearch5SinkExample $TEST_PROGRAM_JAR \
+  --numRecords 20 \
+  --index index \
+  --type type
+
+verify_result 20

Reply via email to