[FLINK-9008] [e2e] Implements quickstarts end to end test

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/393eb679
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/393eb679
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/393eb679

Branch: refs/heads/master
Commit: 393eb679d39658c2592cf736d7b50921ed6c5319
Parents: 71095dc
Author: zhangminglei <[email protected]>
Authored: Thu May 17 21:05:10 2018 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue May 22 16:00:17 2018 +0800

----------------------------------------------------------------------
 .../flink-quickstart-test/pom.xml               |  49 +++++++
 .../quickstart/ElasticsearchStreamingJob.java   |  89 +++++++++++++
 flink-end-to-end-tests/pom.xml                  |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh     |   8 ++
 .../test-scripts/test_quickstarts.sh            | 127 +++++++++++++++++++
 5 files changed, 274 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/393eb679/flink-end-to-end-tests/flink-quickstart-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-quickstart-test/pom.xml 
b/flink-end-to-end-tests/flink-quickstart-test/pom.xml
new file mode 100644
index 0000000..59f5416
--- /dev/null
+++ b/flink-end-to-end-tests/flink-quickstart-test/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+    -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <artifactId>flink-end-to-end-tests</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.6-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>flink-quickstart-test</artifactId>
+       <name>flink-quickstart-test</name>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+       </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/393eb679/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
 
b/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
new file mode 100644
index 0000000..e20e045
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.quickstart;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch example for Flink Streaming Job.
+ *
+ * <p>In this streaming job, we generate a bunch of data from numbers, apply 
operator map
+ * made a type conversion. Then we choose elasticsearch as its sink to storage 
these data.
+ *
+ * <p>Run test_quickstarts.sh to verify this program. Package this class to a 
jar, verify the jar,
+ * then deploy it on a flink cluster.
+ */
+public class ElasticsearchStreamingJob {
+
+       public static void main(String[] args) throws Exception {
+               // set up the streaming execution environment
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<String> source = env.generateSequence(0, 20)
+                       .map(new MapFunction<Long, String>() {
+                               @Override
+                               public String map(Long value) throws Exception {
+                                       return value.toString();
+                               }});
+
+               Map<String, String> userConfig = new HashMap<>();
+               userConfig.put("cluster.name", "elasticsearch");
+               // This instructs the sink to emit after every element, 
otherwise they would be buffered
+               
userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+               List<InetSocketAddress> transports = new ArrayList<>();
+               transports.add(new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+               source.addSink(new ElasticsearchSink<>(userConfig, transports, 
new ElasticsearchSinkFunction<String>(){
+                       @Override
+                       public void process(String element, RuntimeContext ctx, 
org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
+                               indexer.add(createIndexRequest(element));
+                       }
+               }));
+
+               // execute program
+               env.execute("Flink Streaming Job of writing data to 
elasticsearch");
+       }
+
+       private static IndexRequest createIndexRequest(String element) {
+               Map<String, Object> json = new HashMap<>();
+               json.put("data", element);
+
+               return Requests.indexRequest()
+                       .index("my-index")
+                       .type("my-type")
+                       .id(element)
+                       .source(json);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/393eb679/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 581abc8..8fb7eb8 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -47,6 +47,7 @@ under the License.
                <module>flink-elasticsearch1-test</module>
                <module>flink-elasticsearch2-test</module>
                <module>flink-elasticsearch5-test</module>
+               <module>flink-quickstart-test</module>
        </modules>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/393eb679/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 0ec3492..2a15e6d 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -191,5 +191,13 @@ if [ $EXIT_CODE == 0 ]; then
   EXIT_CODE=$?
 fi
 
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==============================================================================\n"
+  printf "Running Quickstarts nightly end-to-end test\n"
+  printf 
"==============================================================================\n"
+  $END_TO_END_DIR/test-scripts/test_quickstarts.sh
+  EXIT_CODE=$?
+fi
+
 # Exit code for Travis build success/failure
 exit $EXIT_CODE

http://git-wip-us.apache.org/repos/asf/flink/blob/393eb679/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_quickstarts.sh 
b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
new file mode 100755
index 0000000..8f7aaab
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
@@ -0,0 +1,127 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# End to end test for quick starts test.
+
+source "$(dirname "$0")"/common.sh
+
+mkdir -p $TEST_DATA_DIR
+
+cd $TEST_DATA_DIR
+
+mvn archetype:generate                             \
+    -DarchetypeGroupId=org.apache.flink            \
+    -DarchetypeArtifactId=flink-quickstart-java    \
+    -DarchetypeVersion=1.6-SNAPSHOT                \
+    -DgroupId=org.apache.flink.quickstart          \
+    -DartifactId=flink-java-project                \
+    -Dversion=0.1                                  \
+    -Dpackage=org.apache.flink.quickstart          \
+    -DinteractiveMode=false
+
+cd flink-java-project
+
+cp 
$TEST_DATA_DIR/../../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
 $TEST_DATA_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+
+position=$(awk '/<dependencies>/ {print NR}' pom.xml | head -1)
+
+sed -i -e ''"$(($position + 1))"'i\
+<dependency>\
+<groupId>org.apache.flink</groupId>\
+<artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId>\
+<version>${flink.version}</version>\
+</dependency>' pom.xml
+
+sed -i -e 
"s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/"
 pom.xml
+
+mvn clean package -nsu
+
+cd target
+jar tvf flink-java-project-0.1.jar > contentsInJar.txt
+
+if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
+      `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \
+      `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` 
-eq '0' && \
+      `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' 
&& \
+      `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' 
]]; then
+
+    echo "Success: There are no flink core classes are contained in the jar."
+else
+    echo "Failure: There are flink core classes are contained in the jar."
+    PASS=""
+    exit 1
+fi
+
+if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+      `grep -c "org/apache/flink/quickstart/ElasticsearchStreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+      `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" 
contentsInJar.txt` -eq '0' ]]; then
+
+    echo "Failure: Since ElasticsearchStreamingJob.class and other user 
classes are not included in the jar. "
+    PASS=""
+    exit 1
+else
+    echo "Success: ElasticsearchStreamingJob.class and other user classes are 
included in the jar."
+fi
+
+ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
+
+curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz
+tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
+
+nohup $ELASTICSEARCH_DIR/bin/elasticsearch &
+
+ELASTICSEARCH_PROCESS=$(jps | grep Elasticsearch | awk '{print $2}')
+
+# make sure the elasticsearch node is actually running
+if [ "$ELASTICSEARCH_PROCESS" != "Elasticsearch" ]; then
+  echo "Elasticsearch node is not running."
+  PASS=""
+  exit 1
+else
+  echo "Elasticsearch node is running."
+fi
+
+TEST_PROGRAM_JAR=$TEST_DATA_DIR/flink-java-project/target/flink-java-project-0.1.jar
+
+start_cluster
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -c 
org.apache.flink.quickstart.ElasticsearchStreamingJob $TEST_PROGRAM_JAR
+
+curl 'localhost:9200/my-index/_search?q=*&pretty&size=21' > 
$TEST_DATA_DIR/output
+
+if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then
+    echo "Quickstarts end to end test pass."
+else
+    echo "Quickstarts end to end test failed."
+    PASS=""
+    exit 1
+fi
+
+function shutdownAndCleanup {
+    pid=$(jps | grep Elasticsearch | awk '{print $1}')
+    kill -SIGTERM $pid
+
+    # make sure to run regular cleanup as well
+    cleanup
+}
+
+trap shutdownAndCleanup INT
+trap shutdownAndCleanup EXIT

Reply via email to