This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push:
new 11af452 [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test
11af452 is described below
commit 11af4523801164539e186d836462f5884b561941
Author: Thomas Weise <[email protected]>
AuthorDate: Thu Feb 28 16:11:50 2019 -0800
[FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test
---
.../flink-streaming-kinesis-test/pom.xml | 92 +++++++++++++++
.../streaming/kinesis/test/KinesisExample.java | 91 +++++++++++++++
.../streaming/kinesis/test/KinesisExampleTest.java | 127 ++++++++++++++++++++
.../kinesis/test/KinesisPubsubClient.java | 128 +++++++++++++++++++++
flink-end-to-end-tests/pom.xml | 21 ++++
flink-end-to-end-tests/run-pre-commit-tests.sh | 1 +
flink-end-to-end-tests/test-scripts/common.sh | 2 +-
.../test-scripts/test_streaming_kinesis.sh | 63 ++++++++++
8 files changed, 524 insertions(+), 1 deletion(-)
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
new file mode 100644
index 0000000..2774964
--- /dev/null
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.8-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+
<artifactId>flink-streaming-kinesis-test_${scala.binary.version}</artifactId>
+ <name>flink-streaming-kinesis-test</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-kafka-test-base_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Use the shade plugin to build a fat jar for the
Kinesis connector test -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>fat-jar-kinesis-example</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+
<shadeTestJar>false</shadeTestJar>
+
<shadedArtifactAttached>false</shadedArtifactAttached>
+
<createDependencyReducedPom>false</createDependencyReducedPom>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+
<mainClass>org.apache.flink.streaming.kinesis.test.KinesisExample</mainClass>
+ </transformer>
+ </transformers>
+
<finalName>KinesisExample</finalName>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
new file mode 100644
index 0000000..4957c35
--- /dev/null
+++
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.kinesis.test;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.kafka.test.base.CustomWatermarkExtractor;
+import org.apache.flink.streaming.kafka.test.base.KafkaEvent;
+import org.apache.flink.streaming.kafka.test.base.KafkaEventSchema;
+import org.apache.flink.streaming.kafka.test.base.KafkaExampleUtil;
+import org.apache.flink.streaming.kafka.test.base.RollingAdditionMapper;
+
+import java.net.URL;
+import java.util.Properties;
+
+/**
+ * A simple example that shows how to read from and write to Kinesis. This
will read String messages
+ * from the input topic, parse them into a POJO type {@link KafkaEvent}, group
by some key, and finally
+ * perform a rolling addition on each key for which the results are written
back to another topic.
+ *
+ * <p>This example also demonstrates using a watermark assigner to generate
per-partition
+ * watermarks directly in the Flink Kinesis consumer. For demonstration
purposes, it is assumed that
+ * the String messages formatted as a (word,frequency,timestamp) tuple.
+ *
+ * <p>Example usage:
+ * --input-stream test-input --output-stream test-output --aws.endpoint
https://localhost:4567 --flink.stream.initpos TRIM_HORIZON
+ */
+public class KinesisExample {
+ public static void main(String[] args) throws Exception {
+ // parse input arguments
+ final ParameterTool parameterTool =
ParameterTool.fromArgs(args);
+ StreamExecutionEnvironment env =
KafkaExampleUtil.prepareExecutionEnv(parameterTool);
+
+ String inputStream = parameterTool.getRequired("input-stream");
+ String outputStream =
parameterTool.getRequired("output-stream");
+
+ FlinkKinesisConsumer<KafkaEvent> consumer = new
FlinkKinesisConsumer<>(
+ inputStream,
+ new KafkaEventSchema(),
+ parameterTool.getProperties());
+ consumer.setPeriodicWatermarkAssigner(new
CustomWatermarkExtractor());
+
+ Properties producerProperties = new
Properties(parameterTool.getProperties());
+ // producer needs region even when URL is specified
+
producerProperties.putIfAbsent(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ // test driver does not deaggregate
+ producerProperties.putIfAbsent("AggregationEnabled",
String.valueOf(false));
+
+ // KPL does not recognize endpoint URL..
+ String kinesisUrl =
producerProperties.getProperty(ConsumerConfigConstants.AWS_ENDPOINT);
+ if (kinesisUrl != null) {
+ URL url = new URL(kinesisUrl);
+ producerProperties.put("KinesisEndpoint",
url.getHost());
+ producerProperties.put("KinesisPort",
Integer.toString(url.getPort()));
+ producerProperties.put("VerifyCertificate", "false");
+ }
+
+ FlinkKinesisProducer<KafkaEvent> producer = new
FlinkKinesisProducer<>(
+ new KafkaEventSchema(),
+ producerProperties);
+ producer.setDefaultStream(outputStream);
+ producer.setDefaultPartition("fakePartition");
+
+ DataStream<KafkaEvent> input = env
+ .addSource(consumer)
+ .keyBy("word")
+ .map(new RollingAdditionMapper());
+
+ input.addSink(producer);
+ env.execute();
+ }
+}
diff --git
a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java
new file mode 100644
index 0000000..1a6d6d7
--- /dev/null
+++
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.kinesis.test;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.java.utils.ParameterTool;
+
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Test driver for {@link KinesisExample#main}.
+ */
+public class KinesisExampleTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(KinesisExampleTest.class);
+
+ /**
+ * Interface to the pubsub system for this test.
+ */
+ interface PubsubClient {
+ void createTopic(String topic, int partitions, Properties
props) throws Exception;
+
+ void sendMessage(String topic, String msg);
+
+ List<String> readAllMessages(String streamName) throws
Exception;
+ }
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("System properties: {}", System.getProperties());
+ final ParameterTool parameterTool =
ParameterTool.fromArgs(args);
+
+ String inputStream = parameterTool.getRequired("input-stream");
+ String outputStream =
parameterTool.getRequired("output-stream");
+
+ PubsubClient pubsub = new
KinesisPubsubClient(parameterTool.getProperties());
+ pubsub.createTopic(inputStream, 2,
parameterTool.getProperties());
+ pubsub.createTopic(outputStream, 2,
parameterTool.getProperties());
+
+ // The example job needs to start after streams are created and
run in parallel to the validation logic.
+ // The thread that runs the job won't terminate, we don't have
a job reference to cancel it.
+ // Once results are validated, the driver main thread will
exit; job/cluster will be terminated from script.
+ final AtomicReference<Exception> executeException = new
AtomicReference<>();
+ Thread executeThread =
+ new Thread(
+ () -> {
+ try {
+ KinesisExample.main(args);
+ // this message won't appear in
the log,
+ // job is terminated when
shutting down cluster
+ LOG.info("executed program");
+ } catch (Exception e) {
+ executeException.set(e);
+ }
+ });
+ executeThread.start();
+
+ // generate input
+ String[] messages = {
+ "elephant,5,45218",
+ "squirrel,12,46213",
+ "bee,3,51348",
+ "squirrel,22,52444",
+ "bee,10,53412",
+ "elephant,9,54867"
+ };
+ for (String msg : messages) {
+ pubsub.sendMessage(inputStream, msg);
+ }
+ LOG.info("generated records");
+
+ Deadline deadline = Deadline.fromNow(Duration.ofSeconds(60));
+ List<String> results = pubsub.readAllMessages(outputStream);
+ while (deadline.hasTimeLeft() && executeException.get() == null
&& results.size() < messages.length) {
+ LOG.info("waiting for results..");
+ Thread.sleep(1000);
+ results = pubsub.readAllMessages(outputStream);
+ }
+
+ if (executeException.get() != null) {
+ throw executeException.get();
+ }
+
+ LOG.info("results: {}", results);
+ Assert.assertEquals("Results received from '" + outputStream +
"': " + results,
+ messages.length, results.size());
+
+ String[] expectedResults = {
+ "elephant,5,45218",
+ "elephant,14,54867",
+ "squirrel,12,46213",
+ "squirrel,34,52444",
+ "bee,3,51348",
+ "bee,13,53412"
+ };
+
+ for (String expectedResult : expectedResults) {
+ Assert.assertTrue(expectedResult,
results.contains(expectedResult));
+ }
+
+ // TODO: main thread needs to create job or CLI fails with:
+ // "The program didn't contain a Flink job. Perhaps you forgot
to call execute() on the execution environment."
+ System.out.println("test finished");
+ System.exit(0);
+ }
+
+}
diff --git
a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java
new file mode 100644
index 0000000..486b565
--- /dev/null
+++
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.kinesis.test;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.kinesis.shaded.com.amazonaws.AmazonClientException;
+import
org.apache.flink.kinesis.shaded.com.amazonaws.auth.AWSCredentialsProvider;
+import
org.apache.flink.kinesis.shaded.com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import
org.apache.flink.kinesis.shaded.com.amazonaws.client.builder.AwsClientBuilder;
+import
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesis;
+import
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult;
+import
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordRequest;
+import
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordResult;
+import
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record;
+import
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+class KinesisPubsubClient implements KinesisExampleTest.PubsubClient {
+ private static final Logger LOG =
LoggerFactory.getLogger(KinesisPubsubClient.class);
+
+ private final AmazonKinesis kinesisClient;
+ private final Properties properties;
+
+ KinesisPubsubClient(Properties properties) {
+ this.kinesisClient = createClientWithCredentials(properties);
+ this.properties = properties;
+ }
+
+ @Override
+ public void createTopic(String stream, int shards, Properties props)
throws Exception {
+ try {
+ kinesisClient.describeStream(stream);
+ kinesisClient.deleteStream(stream);
+ } catch (ResourceNotFoundException rnfe) {
+ // expected when stream doesn't exist
+ }
+
+ kinesisClient.createStream(stream, shards);
+ Deadline deadline = Deadline.fromNow(Duration.ofSeconds(5));
+ while (deadline.hasTimeLeft()) {
+ try {
+ Thread.sleep(250); // sleep for a bit for
stream to be created
+ if
(kinesisClient.describeStream(stream).getStreamDescription()
+ .getShards().size() != shards) {
+ // not fully created yet
+ continue;
+ }
+ break;
+ } catch (ResourceNotFoundException rnfe) {
+ // not ready yet
+ }
+ }
+ }
+
+ @Override
+ public void sendMessage(String topic, String msg) {
+ PutRecordRequest putRecordRequest = new PutRecordRequest();
+ putRecordRequest.setStreamName(topic);
+ putRecordRequest.setPartitionKey("fakePartitionKey");
+ putRecordRequest.withData(ByteBuffer.wrap(msg.getBytes()));
+ PutRecordResult putRecordResult =
kinesisClient.putRecord(putRecordRequest);
+ LOG.info("added record: {}",
putRecordResult.getSequenceNumber());
+ }
+
+ @Override
+ public List<String> readAllMessages(String streamName) throws Exception
{
+ KinesisProxyInterface kinesisProxy =
KinesisProxy.create(properties);
+ Map<String, String> streamNamesWithLastSeenShardIds = new
HashMap<>();
+ streamNamesWithLastSeenShardIds.put(streamName, null);
+
+ GetShardListResult shardListResult =
kinesisProxy.getShardList(streamNamesWithLastSeenShardIds);
+ int maxRecordsToFetch = 10;
+
+ List<String> messages = new ArrayList<>();
+ // retrieve records from all shards
+ for (StreamShardHandle ssh :
shardListResult.getRetrievedShardListOfStream(streamName)) {
+ String shardIterator =
kinesisProxy.getShardIterator(ssh, "TRIM_HORIZON", null);
+ GetRecordsResult getRecordsResult =
kinesisProxy.getRecords(shardIterator, maxRecordsToFetch);
+ List<Record> aggregatedRecords =
getRecordsResult.getRecords();
+ for (Record record : aggregatedRecords) {
+ messages.add(new
String(record.getData().array()));
+ }
+ }
+ return messages;
+ }
+
+ private static AmazonKinesis createClientWithCredentials(Properties
props) throws AmazonClientException {
+ AWSCredentialsProvider credentialsProvider = new
EnvironmentVariableCredentialsProvider();
+ return AmazonKinesisClientBuilder.standard()
+ .withCredentials(credentialsProvider)
+ .withEndpointConfiguration(
+ new AwsClientBuilder.EndpointConfiguration(
+
props.getProperty(ConsumerConfigConstants.AWS_ENDPOINT), "us-east-1"))
+ .build();
+ }
+
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index f6c36dc..b50ca6b 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -68,6 +68,27 @@ under the License.
<module>flink-streaming-kafka010-test</module>
</modules>
+ <!-- See main pom.xml for explanation of profiles -->
+ <profiles>
+ <!--
+ We include the kinesis module only optionally because
it contains a dependency
+ licenced under the "Amazon Software License".
+ In accordance with the discussion in
https://issues.apache.org/jira/browse/LEGAL-198
+ this is an optional module for Flink.
+ -->
+ <profile>
+ <id>include-kinesis</id>
+ <activation>
+ <property>
+ <name>include-kinesis</name>
+ </property>
+ </activation>
+ <modules>
+ <module>flink-streaming-kinesis-test</module>
+ </modules>
+ </profile>
+ </profiles>
+
<build>
<plugins>
<plugin>
diff --git a/flink-end-to-end-tests/run-pre-commit-tests.sh
b/flink-end-to-end-tests/run-pre-commit-tests.sh
index 4a9a1a0..f40fd56 100755
--- a/flink-end-to-end-tests/run-pre-commit-tests.sh
+++ b/flink-end-to-end-tests/run-pre-commit-tests.sh
@@ -56,6 +56,7 @@ run_test "Wordcount end-to-end test"
"$END_TO_END_DIR/test-scripts/test_batch_wo
run_test "Kafka 0.10 end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_kafka010.sh"
run_test "Kafka 0.11 end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_kafka011.sh"
run_test "Modern Kafka end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_kafka.sh"
+run_test "Kinesis end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_kinesis.sh"
run_test "class loading end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_classloader.sh"
run_test "Shaded Hadoop S3A end-to-end test"
"$END_TO_END_DIR/test-scripts/test_shaded_hadoop_s3a.sh"
run_test "Shaded Presto S3 end-to-end test"
"$END_TO_END_DIR/test-scripts/test_shaded_presto_s3.sh"
diff --git a/flink-end-to-end-tests/test-scripts/common.sh
b/flink-end-to-end-tests/test-scripts/common.sh
index c06249e..1a54aba 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -47,7 +47,7 @@ cd $TEST_INFRA_DIR
TEST_INFRA_DIR=`pwd -P`
cd $TEST_ROOT
-NODENAME=`hostname -f`
+NODENAME=${NODENAME:-`hostname -f`}
# REST_PROTOCOL and CURL_SSL_ARGS can be modified in common_ssl.sh if SSL is
activated
# they should be used in curl command to query Flink REST API
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kinesis.sh
b/flink-end-to-end-tests/test-scripts/test_streaming_kinesis.sh
new file mode 100755
index 0000000..1981b13
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kinesis.sh
@@ -0,0 +1,63 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+# Kinesalite doesn't support CBOR
+export AWS_CBOR_DISABLE=1
+
+# Required by the KPL native process
+export AWS_ACCESS_KEY_ID=flinkKinesisTestFakeAccessKeyId
+export AWS_SECRET_KEY=flinkKinesisTestFakeAccessKey
+
+KINESALITE_PORT=4567
+
+#docker run -d --rm --name flink-test-kinesis -p
${KINESALITE_PORT}:${KINESALITE_PORT} instructure/kinesalite
+# override entrypoint to enable SSL
+docker run -d --rm --entrypoint "/tini" --name flink-test-kinesis -p
${KINESALITE_PORT}:${KINESALITE_PORT} instructure/kinesalite --
/usr/src/app/node_modules/kinesalite/cli.js --path /var/lib/kinesalite --ssl
+
+# reveal potential issues with the container in the CI environment
+docker logs flink-test-kinesis
+
+function test_cleanup {
+ # don't call ourselves again for another signal interruption
+ trap "exit -1" INT
+ # don't call ourselves again for normal exit
+ trap "" EXIT
+ # job needs to stop before kinesalite
+ stop_cluster
+ echo "terminating kinesalite"
+ docker kill flink-test-kinesis
+}
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+# prefix com.amazonaws.sdk.disableCertChecking to account for shading
+DISABLE_CERT_CHECKING_JAVA_OPTS="-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking"
+
+export FLINK_ENV_JAVA_OPTS=${DISABLE_CERT_CHECKING_JAVA_OPTS}
+start_cluster
+
+TEST_JAR="${END_TO_END_DIR}/flink-streaming-kinesis-test/target/KinesisExample.jar"
+JVM_ARGS=${DISABLE_CERT_CHECKING_JAVA_OPTS} \
+$FLINK_DIR/bin/flink run -p 1 -c
org.apache.flink.streaming.kinesis.test.KinesisExampleTest $TEST_JAR \
+ --input-stream test-input --output-stream test-output \
+ --aws.endpoint https://localhost:${KINESALITE_PORT}
--aws.credentials.provider.basic.secretkey fakekey
--aws.credentials.provider.basic.accesskeyid fakeid \
+ --flink.stream.initpos TRIM_HORIZON \
+ --flink.partition-discovery.interval-millis 1000