This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
commit 861b8d7ef7e1bd3c111df4d3e71911cd7e67a3b8 Author: Danny Cranmer <[email protected]> AuthorDate: Sat Dec 3 20:44:17 2022 +0000 [FLINK-29908][Connectors/Firehose] Externalize and configure E2E tests for Kinesis connector --- .../pom.xml | 2 +- .../pom.xml | 2 +- .../flink-connector-kinesis-e2e-tests/pom.xml | 177 ++++++++++++++++++++ .../kinesis/test/CustomWatermarkExtractor.java | 53 ++++++ .../apache/flink/streaming/kinesis/test/Event.java | 71 ++++++++ .../flink/streaming/kinesis/test/EventSchema.java | 53 ++++++ .../streaming/kinesis/test/KinesisExample.java | 102 ++++++++++++ .../streaming/kinesis/test/KinesisExampleTest.java | 122 ++++++++++++++ .../kinesis/test/RollingAdditionMapper.java | 55 ++++++ .../kinesis/test/KinesisTableApiITCase.java | 184 +++++++++++++++++++++ .../flink/streaming/kinesis/test/model/Order.java | 65 ++++++++ .../src/test/resources/filter-large-orders.sql | 52 ++++++ .../src/test/resources/log4j2-test.properties | 28 ++++ flink-connector-aws-e2e-tests/pom.xml | 1 + pom.xml | 6 - 15 files changed, 965 insertions(+), 8 deletions(-) diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml index b2e0878..62ade01 100644 --- a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml +++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml @@ -30,7 +30,7 @@ <modelVersion>4.0.0</modelVersion> <artifactId>flink-connector-aws-kinesis-firehose-e2e-tests</artifactId> - <name>Flink : Connectors : AWS : E2E Tests : Amazon Kinesis Firehose e2e tests</name> + <name>Flink : Connectors : AWS : E2E Tests : Amazon Kinesis Firehose</name> <packaging>jar</packaging> <dependencies> diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml index 1a5d793..5557abf 100644 --- a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml +++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml @@ -30,7 +30,7 @@ <modelVersion>4.0.0</modelVersion> <artifactId>flink-connector-aws-kinesis-streams-e2e-tests</artifactId> - <name>Flink : Connectors : AWS : E2E Tests : Amazon Kinesis Streams e2e tests</name> + <name>Flink : Connectors : AWS : E2E Tests : Amazon Kinesis Streams v2</name> <packaging>jar</packaging> <dependencies> diff --git a/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/pom.xml b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/pom.xml new file mode 100644 index 0000000..e596dc1 --- /dev/null +++ b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/pom.xml @@ -0,0 +1,177 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <artifactId>flink-connector-aws-e2e-tests-parent</artifactId> + <groupId>org.apache.flink</groupId> + <version>4.0-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-connector-kinesis-e2e-tests</artifactId> + <name>Flink : Connectors : AWS : E2E Tests : Amazon Kinesis Streams</name> + <packaging>jar</packaging> + + <dependencies> + <!-- <dependency>--> + <!-- <groupId>org.apache.flink</groupId>--> + <!-- <artifactId>flink-streaming-kafka-test-base</artifactId>--> + <!-- <version>${flink.version}</version>--> + <!-- </dependency>--> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kinesis</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kinesis</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-aws-base</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-aws-kinesis-streams</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils-junit</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Use the shade plugin to build a fat jar for the Kinesis connector test --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>fat-jar-kinesis-example</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadedArtifactAttached>false</shadedArtifactAttached> + <createDependencyReducedPom>false</createDependencyReducedPom> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.streaming.kinesis.test.KinesisExample</mainClass> + </transformer> + </transformers> + <finalName>KinesisExample</finalName> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy</id> + <phase>pre-integration-test</phase> + <goals> + <goal>copy</goal> + </goals> + </execution> + </executions> + <configuration> + <artifactItems> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-connector-kinesis</artifactId> + <version>${project.version}</version> + <destFileName>sql-kinesis.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + </artifactItems> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <!-- Required for Kinesalite. --> + <!-- Including shaded and non-shaded conf to support test running from Maven and IntelliJ --> + <com.amazonaws.sdk.disableCbor>true</com.amazonaws.sdk.disableCbor> + <com.amazonaws.sdk.disableCertChecking>true</com.amazonaws.sdk.disableCertChecking> + <org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor>true + </org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor> + <org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking>true + </org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking> + </systemPropertyVariables> + </configuration> + </plugin> + + <!-- Skip dependency convergence check because of guava version --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <executions> + <execution> + <id>dependency-convergence</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/CustomWatermarkExtractor.java b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/CustomWatermarkExtractor.java new file mode 100644 index 0000000..cb95e47 --- /dev/null +++ b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/CustomWatermarkExtractor.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.kinesis.test; + +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; +import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; +import org.apache.flink.streaming.api.watermark.Watermark; + +import javax.annotation.Nullable; + +/** + * A custom {@link AssignerWithPeriodicWatermarks}, that simply assumes that the input stream + * records are strictly ascending. + * + * <p>Flink also ships some built-in convenience assigners, such as the {@link + * BoundedOutOfOrdernessTimestampExtractor} and {@link AscendingTimestampExtractor} + */ +public class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<Event> { + + private static final long serialVersionUID = -742759155861320823L; + + private long currentTimestamp = Long.MIN_VALUE; + + @Override + public long extractTimestamp(Event event, long previousElementTimestamp) { + // the inputs are assumed to be of format (message,timestamp) + this.currentTimestamp = event.getTimestamp(); + return event.getTimestamp(); + } + + @Nullable + @Override + public Watermark getCurrentWatermark() { + return new Watermark( + currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1); + } +} diff --git a/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/Event.java b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/Event.java new file mode 100644 index 0000000..86bb44b --- /dev/null +++ b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/Event.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.kinesis.test; + +/** + * This is a Java POJO, which Flink recognizes and will allow "by-name" field referencing when + * keying a {@link org.apache.flink.streaming.api.datastream.DataStream} of such a type. + */ +public class Event { + + private String word; + private int frequency; + private long timestamp; + + public Event() {} + + public Event(String word, int frequency, long timestamp) { + this.word = word; + this.frequency = frequency; + this.timestamp = timestamp; + } + + public String getWord() { + return word; + } + + public void setWord(String word) { + this.word = word; + } + + public int getFrequency() { + return frequency; + } + + public void setFrequency(int frequency) { + this.frequency = frequency; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public static Event fromString(String eventStr) { + String[] split = eventStr.split(","); + return new Event(split[0], Integer.valueOf(split[1]), Long.valueOf(split[2])); + } + + @Override + public String toString() { + return word + "," + frequency + "," + timestamp; + } +} diff --git a/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/EventSchema.java b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/EventSchema.java new file mode 100644 index 0000000..a158fef --- /dev/null +++ b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/EventSchema.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.kinesis.test; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; + +/** + * The serialization schema for the {@link Event} type. This class defines how to transform a Kafka + * record's bytes to a {@link Event}, and vice-versa. + */ +public class EventSchema implements DeserializationSchema<Event>, SerializationSchema<Event> { + + private static final long serialVersionUID = 1L; + + @Override + public byte[] serialize(Event event) { + return event.toString().getBytes(); + } + + @Override + public Event deserialize(byte[] message) throws IOException { + return Event.fromString(new String(message)); + } + + @Override + public boolean isEndOfStream(Event nextElement) { + return false; + } + + @Override + public TypeInformation<Event> getProducedType() { + return TypeInformation.of(Event.class); + } +} diff --git a/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java new file mode 100644 index 0000000..7c67bfd --- /dev/null +++ b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.kinesis.test; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; + +import java.net.URL; +import java.util.Properties; + +/** + * A simple example that shows how to read from and write to Kinesis. This will read String messages + * from the input topic, parse them into a POJO type {@link Event}, group by some key, and finally + * perform a rolling addition on each key for which the results are written back to another topic. + * + * <p>This example also demonstrates using a watermark assigner to generate per-partition watermarks + * directly in the Flink Kinesis consumer. For demonstration purposes, it is assumed that the String + * messages formatted as a (word,frequency,timestamp) tuple. + * + * <p>Example usage: --input-stream test-input --output-stream test-output --aws.endpoint + * https://localhost:4567 --flink.stream.initpos TRIM_HORIZON + */ +public class KinesisExample { + public static void main(String[] args) throws Exception { + // parse input arguments + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + + if (parameterTool.getNumberOfParameters() < 5) { + System.out.println( + "Missing parameters!\n" + + "Usage: Kafka --input-topic <topic> --output-topic <topic> " + + "--bootstrap.servers <kafka brokers> " + + "--group.id <some id>"); + throw new Exception( + "Missing parameters!\n" + + "Usage: Kafka --input-topic <topic> --output-topic <topic> " + + "--bootstrap.servers <kafka brokers> " + + "--group.id <some id>"); + } + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); + env.enableCheckpointing(5000); // create a checkpoint every 5 seconds + env.getConfig() + .setGlobalJobParameters( + parameterTool); // make parameters available in the web interface + + String inputStream = parameterTool.getRequired("input-stream"); + String outputStream = parameterTool.getRequired("output-stream"); + + FlinkKinesisConsumer<Event> consumer = + new FlinkKinesisConsumer<>( + inputStream, new EventSchema(), parameterTool.getProperties()); + consumer.setPeriodicWatermarkAssigner(new CustomWatermarkExtractor()); + + Properties producerProperties = new Properties(parameterTool.getProperties()); + // producer needs region even when URL is specified + producerProperties.putIfAbsent(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + // test driver does not deaggregate + producerProperties.putIfAbsent("AggregationEnabled", String.valueOf(false)); + + // KPL does not recognize endpoint URL.. + String kinesisUrl = producerProperties.getProperty(ConsumerConfigConstants.AWS_ENDPOINT); + if (kinesisUrl != null) { + URL url = new URL(kinesisUrl); + producerProperties.put("KinesisEndpoint", url.getHost()); + producerProperties.put("KinesisPort", Integer.toString(url.getPort())); + producerProperties.put("VerifyCertificate", "false"); + } + + FlinkKinesisProducer<Event> producer = + new FlinkKinesisProducer<>(new EventSchema(), producerProperties); + producer.setDefaultStream(outputStream); + producer.setDefaultPartition("fakePartition"); + + DataStream<Event> input = + env.addSource(consumer).keyBy("word").map(new RollingAdditionMapper()); + + input.addSink(producer); + env.execute(); + } +} diff --git a/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java new file mode 100644 index 0000000..d6ea8ad --- /dev/null +++ b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.kinesis.test; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +/** Test driver for {@link KinesisExample#main}. */ +public class KinesisExampleTest { + private static final Logger LOG = LoggerFactory.getLogger(KinesisExampleTest.class); + + public static void main(String[] args) throws Exception { + LOG.info("System properties: {}", System.getProperties()); + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + + String inputStream = parameterTool.getRequired("input-stream"); + String outputStream = parameterTool.getRequired("output-stream"); + + KinesisPubsubClient pubsub = new KinesisPubsubClient(parameterTool.getProperties()); + pubsub.createTopic(inputStream, 2, parameterTool.getProperties()); + pubsub.createTopic(outputStream, 2, parameterTool.getProperties()); + + // The example job needs to start after streams are created and run in parallel to the + // validation logic. + // The thread that runs the job won't terminate, we don't have a job reference to cancel it. + // Once results are validated, the driver main thread will exit; job/cluster will be + // terminated from script. + final AtomicReference<Exception> executeException = new AtomicReference<>(); + Thread executeThread = + new Thread( + () -> { + try { + KinesisExample.main(args); + // this message won't appear in the log, + // job is terminated when shutting down cluster + LOG.info("executed program"); + } catch (Exception e) { + executeException.set(e); + } + }); + executeThread.start(); + + // generate input + String[] messages = { + "elephant,5,45218", + "squirrel,12,46213", + "bee,3,51348", + "squirrel,22,52444", + "bee,10,53412", + "elephant,9,54867" + }; + for (String msg : messages) { + pubsub.sendMessage(inputStream, msg); + } + LOG.info("generated records"); + + Deadline deadline = Deadline.fromNow(Duration.ofSeconds(60)); + List<String> results = pubsub.readAllMessages(outputStream); + while (deadline.hasTimeLeft() + && executeException.get() == null + && results.size() < messages.length) { + LOG.info("waiting for results.."); + Thread.sleep(1000); + results = pubsub.readAllMessages(outputStream); + } + + if (executeException.get() != null) { + throw executeException.get(); + } + + LOG.info("results: {}", results); + + // Validate.isTrue( + // results.size() == messages.length, + // "Expecting results to equal " + results.size() + " , but was " + + // messages.length); + + String[] expectedResults = { + "elephant,5,45218", + "elephant,14,54867", + "squirrel,12,46213", + "squirrel,34,52444", + "bee,3,51348", + "bee,13,53412" + }; + + for (String expectedResult : expectedResults) { + // Validate.isTrue( + // results.contains(expectedResult), "Expecting to receive " + + // expectedResult); + } + + // TODO: main thread needs to create job or CLI fails with: + // "The program didn't contain a Flink job. Perhaps you forgot to call execute() on the + // execution environment." + System.out.println("test finished"); + System.exit(0); + } +} diff --git a/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/RollingAdditionMapper.java b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/RollingAdditionMapper.java new file mode 100644 index 0000000..4b1f327 --- /dev/null +++ b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/main/java/org/apache/flink/streaming/kinesis/test/RollingAdditionMapper.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.kinesis.test; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.configuration.Configuration; + +/** + * A {@link RichMapFunction} that continuously outputs the current total frequency count of a key. + * The current total count is keyed state managed by Flink. + */ +public class RollingAdditionMapper extends RichMapFunction<Event, Event> { + + private static final long serialVersionUID = 1180234853172462378L; + + private transient ValueState<Integer> currentTotalCount; + + @Override + public Event map(Event event) throws Exception { + Integer totalCount = currentTotalCount.value(); + + if (totalCount == null) { + totalCount = 0; + } + totalCount += event.getFrequency(); + + currentTotalCount.update(totalCount); + + return new Event(event.getWord(), totalCount, event.getTimestamp()); + } + + @Override + public void open(Configuration parameters) throws Exception { + currentTotalCount = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("currentTotalCount", Integer.class)); + } +} diff --git a/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java new file mode 100644 index 0000000..5597b57 --- /dev/null +++ b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.kinesis.test; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; +import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer; +import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient; +import org.apache.flink.streaming.kinesis.test.model.Order; +import org.apache.flink.test.resources.ResourceTestUtils; +import org.apache.flink.test.util.SQLJobSubmission; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.jackson.JacksonMapperFactory; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** End-to-end test for Kinesis Table API using Kinesalite. */ +public class KinesisTableApiITCase extends TestLogger { + + private static final Logger LOGGER = LoggerFactory.getLogger(KinesisTableApiITCase.class); + + private static final String ORDERS_STREAM = "orders"; + private static final String LARGE_ORDERS_STREAM = "large_orders"; + private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite"; + + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); + + private final Path sqlConnectorKinesisJar = ResourceTestUtils.getResource(".*kinesis.jar"); + private static final Network network = Network.newNetwork(); + + @ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES); + + @ClassRule + public static final KinesaliteContainer KINESALITE = + new KinesaliteContainer(DockerImageName.parse(DockerImageVersions.KINESALITE)) + .withNetwork(network) + .withNetworkAliases(INTER_CONTAINER_KINESALITE_ALIAS); + + private KinesisPubsubClient kinesisClient; + + public static final TestcontainersSettings TESTCONTAINERS_SETTINGS = + TestcontainersSettings.builder() + .environmentVariable("AWS_CBOR_DISABLE", "1") + .environmentVariable( + "FLINK_ENV_JAVA_OPTS", + "-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false") + .network(network) + .logger(LOGGER) + .dependsOn(KINESALITE) + .build(); + + public static final FlinkContainers FLINK = + FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build(); + + @BeforeClass + public static void setupFlink() throws Exception { + FLINK.start(); + } + + @AfterClass + public static void stopFlink() { + FLINK.stop(); + } + + @Before + public void setUp() throws Exception { + Properties properties = KINESALITE.getContainerProperties(); + + kinesisClient = new KinesisPubsubClient(properties); + kinesisClient.createTopic(ORDERS_STREAM, 1, properties); + kinesisClient.createTopic(LARGE_ORDERS_STREAM, 1, properties); + + System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); + } + + @After + public void teardown() { + System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property()); + } + + @Test + public void testTableApiSourceAndSink() throws Exception { + List<Order> smallOrders = ImmutableList.of(new Order("A", 5), new Order("B", 10)); + + // filter-large-orders.sql is supposed to preserve orders with quantity > 10 + List<Order> expected = + ImmutableList.of(new Order("C", 15), new Order("D", 20), new Order("E", 25)); + + smallOrders.forEach(order -> kinesisClient.sendMessage(ORDERS_STREAM, toJson(order))); + expected.forEach(order -> kinesisClient.sendMessage(ORDERS_STREAM, toJson(order))); + + executeSqlStatements(readSqlFile("filter-large-orders.sql")); + + // result order is not guaranteed + List<Order> result = readAllOrdersFromKinesis(kinesisClient); + assertThat(result).contains(expected.toArray(new Order[0])); + } + + private List<Order> readAllOrdersFromKinesis(final KinesisPubsubClient client) + throws Exception { + Deadline deadline = Deadline.fromNow(Duration.ofSeconds(5)); + List<Order> orders; + do { + Thread.sleep(1000); + orders = + client.readAllMessages(LARGE_ORDERS_STREAM).stream() + .map(order -> fromJson(order, Order.class)) + .collect(Collectors.toList()); + } while (deadline.hasTimeLeft() && orders.size() < 3); + + return orders; + } + + private List<String> readSqlFile(final String resourceName) throws Exception { + return Files.readAllLines(Paths.get(getClass().getResource("/" + resourceName).toURI())); + } + + private void executeSqlStatements(final List<String> sqlLines) throws Exception { + FLINK.submitSQLJob( + new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines) + .addJars(sqlConnectorKinesisJar) + .build()); + } + + private <T> String toJson(final T object) { + try { + return OBJECT_MAPPER.writeValueAsString(object); + } catch (JsonProcessingException e) { + throw new RuntimeException("Test Failure.", e); + } + } + + private <T> T fromJson(final String json, final Class<T> type) { + try { + return OBJECT_MAPPER.readValue(json, type); + } catch (JsonProcessingException e) { + throw new RuntimeException("Test Failure.", e); + } + } +} diff --git a/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java new file mode 100644 index 0000000..58cec30 --- /dev/null +++ b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.kinesis.test.model; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** POJO model class for sending and receiving records on Kinesis during e2e test. */ +public class Order { + private final String code; + private final int quantity; + + public Order(@JsonProperty("code") final String code, @JsonProperty("quantity") int quantity) { + this.code = code; + this.quantity = quantity; + } + + public String getCode() { + return code; + } + + public int getQuantity() { + return quantity; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + Order order = (Order) o; + return quantity == order.quantity && Objects.equals(code, order.code); + } + + @Override + public int hashCode() { + return Objects.hash(code, quantity); + } + + @Override + public String toString() { + return "Order{" + "code='" + code + '\'' + ", quantity=" + quantity + '}'; + } +} diff --git a/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/test/resources/filter-large-orders.sql b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/test/resources/filter-large-orders.sql new file mode 100644 index 0000000..fcd0866 --- /dev/null +++ b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/test/resources/filter-large-orders.sql @@ -0,0 +1,52 @@ +--/* +-- * Licensed to the Apache Software Foundation (ASF) under one +-- * or more contributor license agreements. See the NOTICE file +-- * distributed with this work for additional information +-- * regarding copyright ownership. The ASF licenses this file +-- * to you under the Apache License, Version 2.0 (the +-- * "License"); you may not use this file except in compliance +-- * with the License. You may obtain a copy of the License at +-- * +-- * http://www.apache.org/licenses/LICENSE-2.0 +-- * +-- * Unless required by applicable law or agreed to in writing, software +-- * distributed under the License is distributed on an "AS IS" BASIS, +-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- * See the License for the specific language governing permissions and +-- * limitations under the License. +-- */ + +CREATE TABLE orders ( + `code` STRING, + `quantity` BIGINT +) WITH ( + 'connector' = 'kinesis', + 'stream' = 'orders', + 'aws.endpoint' = 'https://kinesalite:4567', + 'aws.credentials.provider'='BASIC', + 'aws.credentials.basic.accesskeyid' = 'access key', + 'aws.credentials.basic.secretkey' ='secret key', + 'scan.stream.initpos' = 'TRIM_HORIZON', + 'scan.shard.discovery.intervalmillis' = '1000', + 'scan.shard.adaptivereads' = 'true', + 'format' = 'json' +); + +CREATE TABLE large_orders ( + `code` STRING, + `quantity` BIGINT +) WITH ( + 'connector' = 'kinesis', + 'stream' = 'large_orders', + 'aws.region' = 'us-east-1', + 'aws.endpoint' = 'https://kinesalite:4567', + 'aws.credentials.provider' = 'BASIC', + 'aws.credentials.basic.accesskeyid' = 'access key', + 'aws.credentials.basic.secretkey' ='secret key', + 'aws.trust.all.certificates' = 'true', + 'sink.http-client.protocol.version' = 'HTTP1_1', + 'sink.batch.max-size' = '1', + 'format' = 'json' +); + +INSERT INTO large_orders SELECT * FROM orders WHERE quantity > 10; diff --git a/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/test/resources/log4j2-test.properties b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000..835c2ec --- /dev/null +++ b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n diff --git a/flink-connector-aws-e2e-tests/pom.xml b/flink-connector-aws-e2e-tests/pom.xml index b1266aa..2ae61cc 100644 --- a/flink-connector-aws-e2e-tests/pom.xml +++ b/flink-connector-aws-e2e-tests/pom.xml @@ -41,6 +41,7 @@ under the License. <modules> <module>flink-connector-aws-kinesis-firehose-e2e-tests</module> <module>flink-connector-aws-kinesis-streams-e2e-tests</module> + <module>flink-connector-kinesis-e2e-tests</module> </modules> <dependencies> diff --git a/pom.xml b/pom.xml index 091ec33..4779d4f 100644 --- a/pom.xml +++ b/pom.xml @@ -111,12 +111,6 @@ under the License. <scope>test</scope> </dependency> - <dependency> - <groupId>org.assertj</groupId> - <artifactId>assertj-core</artifactId> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-inline</artifactId>
