This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 89c9f62ec8327f154ce3e8b3fbaf32cad735fd7d Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Fri May 15 18:33:05 2020 +0800 [FLINK-17516] [e2e] Add verification app for exactly-once E2E --- statefun-e2e-tests/pom.xml | 1 + .../statefun-exactly-once-e2e/pom.xml | 109 +++++++++++++++++++++ .../flink/statefun/e2e/exactlyonce/Constants.java | 39 ++++++++ .../exactlyonce/ExactlyOnceVerificationModule.java | 65 ++++++++++++ .../flink/statefun/e2e/exactlyonce/FnCounter.java | 47 +++++++++ .../statefun/e2e/exactlyonce/FnUnwrapper.java | 40 ++++++++ .../flink/statefun/e2e/exactlyonce/KafkaIO.java | 92 +++++++++++++++++ .../main/protobuf/exactly-once-verification.proto | 39 ++++++++ 8 files changed, 432 insertions(+) diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml index 218ed18..83cba8d 100644 --- a/statefun-e2e-tests/pom.xml +++ b/statefun-e2e-tests/pom.xml @@ -32,6 +32,7 @@ under the License. <module>statefun-e2e-tests-common</module> <module>statefun-sanity-e2e</module> <module>statefun-routable-kafka-e2e</module> + <module>statefun-exactly-once-e2e</module> </modules> <build> diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml b/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml new file mode 100644 index 0000000..0ea4d6a --- /dev/null +++ b/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml @@ -0,0 +1,109 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>statefun-e2e-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>2.1-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>statefun-exactly-once-e2e</artifactId> + + <properties> + <testcontainers.version>1.12.5</testcontainers.version> + </properties> + + <dependencies> + <!-- Stateful Functions --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>statefun-sdk</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>statefun-kafka-io</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- Protobuf --> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> + + <!-- logging --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.15</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.17</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>okhttp</artifactId> + <version>3.14.6</version> + <scope>test</scope> + </dependency> + + <!-- End-to-end test common --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>statefun-e2e-tests-common</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <!-- Testcontainers KafkaContainer --> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>com.github.os72</groupId> + <artifactId>protoc-jar-maven-plugin</artifactId> + <version>${protoc-jar-maven-plugin.version}</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <configuration> + <excludePackageNames>org.apache.flink.statefun.examples.greeter.generated</excludePackageNames> + </configuration> + </plugin> + </plugins> + </build> + +</project> diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java new file mode 100644 index 0000000..ab66809 --- /dev/null +++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.e2e.exactlyonce; + +import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount; +import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage; +import org.apache.flink.statefun.sdk.io.EgressIdentifier; +import org.apache.flink.statefun.sdk.io.IngressIdentifier; + +final class Constants { + + private Constants() {} + + static final String KAFKA_BOOTSTRAP_SERVERS_CONF = "kafka-bootstrap-servers"; + + static final IngressIdentifier<WrappedMessage> INGRESS_ID = + new IngressIdentifier<>( + WrappedMessage.class, "org.apache.flink.e2e.exactlyonce", "wrapped-messages"); + + static final EgressIdentifier<InvokeCount> EGRESS_ID = + new EgressIdentifier<>( + "org.apache.flink.e2e.exactlyonce", "invoke-counts", InvokeCount.class); +} diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java new file mode 100644 index 0000000..9dbb465 --- /dev/null +++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.e2e.exactlyonce; + +import com.google.auto.service.AutoService; +import java.util.Map; +import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage; +import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; + +/** + * This is a a simple application used for testing end-to-end exactly-once semantics. + * + * <p>The application reads {@link WrappedMessage}s from a Kafka ingress which gets routed to {@link + * FnUnwrapper} functions, which in turn simply forwards the messages to {@link FnCounter} functions + * with specified target keys defined in the wrapped message. The counter function keeps count of + * the number of times each key as been invoked, and sinks that count to an exactly-once delivery + * Kafka egress for verification. + */ +@AutoService(StatefulFunctionModule.class) +public class ExactlyOnceVerificationModule implements StatefulFunctionModule { + + @Override + public void configure(Map<String, String> globalConfiguration, Binder binder) { + String kafkaBootstrapServers = globalConfiguration.get(Constants.KAFKA_BOOTSTRAP_SERVERS_CONF); + if (kafkaBootstrapServers == null) { + throw new IllegalStateException( + "Missing required global configuration " + Constants.KAFKA_BOOTSTRAP_SERVERS_CONF); + } + + configureKafkaIO(kafkaBootstrapServers, binder); + configureAddressTaggerFunctions(binder); + } + + private static void configureKafkaIO(String kafkaAddress, Binder binder) { + final KafkaIO kafkaIO = new KafkaIO(kafkaAddress); + + binder.bindIngress(kafkaIO.getIngressSpec()); + binder.bindIngressRouter( + Constants.INGRESS_ID, + ((message, downstream) -> downstream.forward(FnUnwrapper.TYPE, message.getKey(), message))); + + binder.bindEgress(kafkaIO.getEgressSpec()); + } + + private static void configureAddressTaggerFunctions(Binder binder) { + binder.bindFunctionProvider(FnUnwrapper.TYPE, ignored -> new FnUnwrapper()); + binder.bindFunctionProvider(FnCounter.TYPE, ignored -> new FnCounter()); + } +} diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java new file mode 100644 index 0000000..5243ebd --- /dev/null +++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.e2e.exactlyonce; + +import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount; +import org.apache.flink.statefun.sdk.Context; +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.StatefulFunction; +import org.apache.flink.statefun.sdk.annotations.Persisted; +import org.apache.flink.statefun.sdk.state.PersistedValue; + +final class FnCounter implements StatefulFunction { + + static final FunctionType TYPE = new FunctionType("org.apache.flink.e2e.exactlyonce", "counter"); + + @Persisted + private final PersistedValue<Integer> invokeCountState = + PersistedValue.of("invoke-count", Integer.class); + + @Override + public void invoke(Context context, Object input) { + final int previousCount = invokeCountState.getOrDefault(0); + final int currentCount = previousCount + 1; + + final InvokeCount invokeCount = + InvokeCount.newBuilder().setId(context.self().id()).setInvokeCount(currentCount).build(); + invokeCountState.set(currentCount); + + context.send(Constants.EGRESS_ID, invokeCount); + } +} diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java new file mode 100644 index 0000000..990e545 --- /dev/null +++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.e2e.exactlyonce; + +import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage; +import org.apache.flink.statefun.sdk.Context; +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.StatefulFunction; + +final class FnUnwrapper implements StatefulFunction { + + static final FunctionType TYPE = + new FunctionType("org.apache.flink.e2e.exactlyonce", "unwrapper"); + + @Override + public void invoke(Context context, Object input) { + final WrappedMessage message = requireWrappedMessage(input); + context.send(FnCounter.TYPE, message.getInvokeTargetId(), message); + } + + private static WrappedMessage requireWrappedMessage(Object input) { + return (WrappedMessage) input; + } +} diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java new file mode 100644 index 0000000..4df60ca --- /dev/null +++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.e2e.exactlyonce; + +import java.time.Duration; +import java.util.Objects; +import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount; +import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage; +import org.apache.flink.statefun.sdk.io.EgressSpec; +import org.apache.flink.statefun.sdk.io.IngressSpec; +import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder; +import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer; +import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder; +import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer; +import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; + +final class KafkaIO { + + static final String WRAPPED_MESSAGES_TOPIC_NAME = "wrapped-messages"; + static final String INVOKE_COUNTS_TOPIC_NAME = "invoke-counts"; + + private final String kafkaAddress; + + KafkaIO(String kafkaAddress) { + this.kafkaAddress = Objects.requireNonNull(kafkaAddress); + } + + IngressSpec<WrappedMessage> getIngressSpec() { + return KafkaIngressBuilder.forIdentifier(Constants.INGRESS_ID) + .withTopic(KafkaIO.WRAPPED_MESSAGES_TOPIC_NAME) + .withKafkaAddress(kafkaAddress) + .withStartupPosition(KafkaIngressStartupPosition.fromEarliest()) + .withConsumerGroupId("exactly-once-e2e") + .withDeserializer(WrappedMessageKafkaDeserializer.class) + .build(); + } + + EgressSpec<InvokeCount> getEgressSpec() { + return KafkaEgressBuilder.forIdentifier(Constants.EGRESS_ID) + .withKafkaAddress(kafkaAddress) + .withExactlyOnceProducerSemantics(Duration.ofMinutes(1)) + .withSerializer(InvokeCountKafkaSerializer.class) + .build(); + } + + private static final class WrappedMessageKafkaDeserializer + implements KafkaIngressDeserializer<WrappedMessage> { + + private static final long serialVersionUID = 1L; + + @Override + public WrappedMessage deserialize(ConsumerRecord<byte[], byte[]> input) { + try { + return WrappedMessage.parseFrom(input.value()); + } catch (Exception e) { + throw new RuntimeException("Error deserializing messages", e); + } + } + } + + private static final class InvokeCountKafkaSerializer + implements KafkaEgressSerializer<InvokeCount> { + + private static final long serialVersionUID = 1L; + + @Override + public ProducerRecord<byte[], byte[]> serialize(InvokeCount invokeCount) { + final byte[] key = invokeCount.getIdBytes().toByteArray(); + final byte[] value = invokeCount.toByteArray(); + + return new ProducerRecord<>(INVOKE_COUNTS_TOPIC_NAME, key, value); + } + } +} diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto new file mode 100644 index 0000000..5e8b41a --- /dev/null +++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +package org.apache.flink.statefun.e2e.exactlyonce; +option java_package = "org.apache.flink.statefun.e2e.exactlyonce.generated"; +option java_multiple_files = false; + +message WrappedMessage { + string invokeTargetId = 1; + string key = 2; +} + +message FnAddress { + string namespace = 1; + string type = 2; + string id = 3; +} + +message InvokeCount { + string id = 1; + int32 invokeCount = 2; +}
