This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
The following commit(s) were added to refs/heads/master by this push:
new 6300f62 [FLINK-23709] Remove SanityVerificationE2E and
ExactlyOnceRemoteE2E
6300f62 is described below
commit 6300f629dc84d0b15684d903e17ebcbd581282e5
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Tue Aug 10 20:37:48 2021 +0800
[FLINK-23709] Remove SanityVerificationE2E and ExactlyOnceRemoteE2E
This closes #249.
---
statefun-e2e-tests/pom.xml | 2 -
.../statefun-exactly-once-remote-e2e/pom.xml | 104 -----------
.../main/protobuf/remote-module-verification.proto | 32 ----
.../src/main/python/functions.py | 95 ----------
.../main/python/remote_module_verification_pb2.py | 149 ----------------
.../e2e/remote/ExactlyOnceWithRemoteFnE2E.java | 194 ---------------------
.../src/test/resources/Dockerfile | 20 ---
.../src/test/resources/Dockerfile.remote-function | 34 ----
.../src/test/resources/log4j.properties | 24 ---
.../src/test/resources/remote-module/module.yaml | 41 -----
.../src/test/resources/requirements.txt | 21 ---
statefun-e2e-tests/statefun-sanity-e2e/pom.xml | 106 -----------
.../flink/statefun/e2e/sanity/Constants.java | 43 -----
.../statefun/e2e/sanity/FnCommandResolver.java | 98 -----------
.../apache/flink/statefun/e2e/sanity/KafkaIO.java | 90 ----------
.../e2e/sanity/SanityVerificationModule.java | 75 --------
.../apache/flink/statefun/e2e/sanity/Utils.java | 33 ----
.../src/main/protobuf/verification-messages.proto | 73 --------
.../statefun/e2e/sanity/SanityVerificationE2E.java | 165 ------------------
.../src/test/resources/Dockerfile | 20 ---
.../src/test/resources/log4j.properties | 24 ---
21 files changed, 1443 deletions(-)
diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index 9a3dae9..d00e6df 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -34,8 +34,6 @@ under the License.
<modules>
<module>statefun-e2e-tests-common</module>
- <module>statefun-sanity-e2e</module>
- <module>statefun-exactly-once-remote-e2e</module>
<module>statefun-smoke-e2e-common</module>
<module>statefun-smoke-e2e-driver</module>
<module>statefun-smoke-e2e-embedded</module>
diff --git a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/pom.xml
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/pom.xml
deleted file mode 100644
index 9f6d5ff..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/pom.xml
+++ /dev/null
@@ -1,104 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://maven.apache.org/POM/4.0.0"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>statefun-e2e-tests</artifactId>
- <groupId>org.apache.flink</groupId>
- <version>3.1-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>statefun-exactly-once-remote-e2e</artifactId>
-
- <properties>
- <kafka.version>2.2.0</kafka.version>
- </properties>
-
- <dependencies>
- <!-- Kafka clients -->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>${kafka.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <!-- Protobuf -->
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>${protobuf.version}</version>
- </dependency>
-
- <!-- logging -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.15</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.17</version>
- <scope>test</scope>
- </dependency>
-
- <!-- End-to-end test common -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>statefun-e2e-tests-common</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- Testcontainers KafkaContainer -->
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>kafka</artifactId>
- <version>${testcontainers.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>com.github.os72</groupId>
- <artifactId>protoc-jar-maven-plugin</artifactId>
- <version>${protoc-jar-maven-plugin.version}</version>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <configuration>
-
<excludePackageNames>org.apache.flink.statefun.examples.greeter.generated</excludePackageNames>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/protobuf/remote-module-verification.proto
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/protobuf/remote-module-verification.proto
deleted file mode 100644
index 3e877ee..0000000
---
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/protobuf/remote-module-verification.proto
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-syntax = "proto3";
-
-package org.apache.flink.statefun.e2e.remote;
-option java_package = "org.apache.flink.statefun.e2e.remote.generated";
-option java_multiple_files = false;
-
-message Invoke {
-}
-
-
-message InvokeResult {
- string id = 1;
- int32 invoke_count = 2;
-}
diff --git
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/python/functions.py
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/python/functions.py
deleted file mode 100644
index 0c9f017..0000000
---
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/python/functions.py
+++ /dev/null
@@ -1,95 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-import uuid
-from statefun import *
-
-from remote_module_verification_pb2 import Invoke, InvokeResult
-
-InvokeType = make_protobuf_type(namespace="statefun.e2e", cls=Invoke)
-InvokeResultType = make_protobuf_type(namespace="statefun.e2e",
cls=InvokeResult)
-
-functions = StatefulFunctions()
-
-
[email protected](
- typename="org.apache.flink.statefun.e2e.remote/counter",
- specs=[ValueSpec(name='invoke_count', type=IntType)])
-def counter(context, message):
- """
- Keeps count of the number of invocations, and forwards that count
- to be sent to the Kafka egress. We do the extra forwarding instead
- of directly sending to Kafka, so that we cover inter-function
- messaging in our E2E test.
- """
- n = context.storage.invoke_count or 0
- n += 1
- context.storage.invoke_count = n
-
- response = InvokeResult()
- response.id = context.address.id
- response.invoke_count = n
-
- context.send(
-
message_builder(target_typename="org.apache.flink.statefun.e2e.remote/forward-function",
- # use random keys to simulate both local handovers and
- # cross-partition messaging via the feedback loop
- target_id=uuid.uuid4().hex,
- value=response,
- value_type=InvokeResultType))
-
-
[email protected]("org.apache.flink.statefun.e2e.remote/forward-function")
-def forward_to_egress(context, message):
- """
- Simply forwards the results to the Kafka egress.
- """
- invoke_result = message.as_type(InvokeResultType)
-
- egress_message = kafka_egress_message(
- typename="org.apache.flink.statefun.e2e.remote/invoke-results",
- topic="invoke-results",
- key=invoke_result.id,
- value=invoke_result,
- value_type=InvokeResultType)
- context.send_egress(egress_message)
-
-
-handler = RequestReplyHandler(functions)
-
-#
-# Serve the endpoint
-#
-
-from flask import request
-from flask import make_response
-from flask import Flask
-
-app = Flask(__name__)
-
-
[email protected]('/service', methods=['POST'])
-def handle():
- response_data = handler.handle_sync(request.data)
- response = make_response(response_data)
- response.headers.set('Content-Type', 'application/octet-stream')
- return response
-
-
-if __name__ == "__main__":
- app.run()
diff --git
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/python/remote_module_verification_pb2.py
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/python/remote_module_verification_pb2.py
deleted file mode 100644
index 8e0a556..0000000
---
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/python/remote_module_verification_pb2.py
+++ /dev/null
@@ -1,149 +0,0 @@
-# -*- coding: utf-8 -*-
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# source: remote-module-verification.proto
-
-import sys
-_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
-from google.protobuf import descriptor as _descriptor
-from google.protobuf import message as _message
-from google.protobuf import reflection as _reflection
-from google.protobuf import symbol_database as _symbol_database
-# @@protoc_insertion_point(imports)
-
-_sym_db = _symbol_database.Default()
-
-
-
-
-DESCRIPTOR = _descriptor.FileDescriptor(
- name='remote-module-verification.proto',
- package='org.apache.flink.statefun.e2e.remote',
- syntax='proto3',
-
serialized_options=_b('\n.org.apache.flink.statefun.e2e.remote.generatedP\000'),
- serialized_pb=_b('\n
remote-module-verification.proto\x12$org.apache.flink.statefun.e2e.remote\"\x08\n\x06Invoke\"\x1c\n\x0bInvokeCount\x12\r\n\x05\x63ount\x18\x01
\x01(\x05\"0\n\x0cInvokeResult\x12\n\n\x02id\x18\x01
\x01(\t\x12\x14\n\x0cinvoke_count\x18\x02
\x01(\x05\x42\x32\n.org.apache.flink.statefun.e2e.remote.generatedP\x00\x62\x06proto3')
-)
-
-
-
-
-_INVOKE = _descriptor.Descriptor(
- name='Invoke',
- full_name='org.apache.flink.statefun.e2e.remote.Invoke',
- filename=None,
- file=DESCRIPTOR,
- containing_type=None,
- fields=[
- ],
- extensions=[
- ],
- nested_types=[],
- enum_types=[
- ],
- serialized_options=None,
- is_extendable=False,
- syntax='proto3',
- extension_ranges=[],
- oneofs=[
- ],
- serialized_start=74,
- serialized_end=82,
-)
-
-
-_INVOKECOUNT = _descriptor.Descriptor(
- name='InvokeCount',
- full_name='org.apache.flink.statefun.e2e.remote.InvokeCount',
- filename=None,
- file=DESCRIPTOR,
- containing_type=None,
- fields=[
- _descriptor.FieldDescriptor(
- name='count',
full_name='org.apache.flink.statefun.e2e.remote.InvokeCount.count', index=0,
- number=1, type=5, cpp_type=1, label=1,
- has_default_value=False, default_value=0,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
- ],
- extensions=[
- ],
- nested_types=[],
- enum_types=[
- ],
- serialized_options=None,
- is_extendable=False,
- syntax='proto3',
- extension_ranges=[],
- oneofs=[
- ],
- serialized_start=84,
- serialized_end=112,
-)
-
-
-_INVOKERESULT = _descriptor.Descriptor(
- name='InvokeResult',
- full_name='org.apache.flink.statefun.e2e.remote.InvokeResult',
- filename=None,
- file=DESCRIPTOR,
- containing_type=None,
- fields=[
- _descriptor.FieldDescriptor(
- name='id',
full_name='org.apache.flink.statefun.e2e.remote.InvokeResult.id', index=0,
- number=1, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=_b("").decode('utf-8'),
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
- _descriptor.FieldDescriptor(
- name='invoke_count',
full_name='org.apache.flink.statefun.e2e.remote.InvokeResult.invoke_count',
index=1,
- number=2, type=5, cpp_type=1, label=1,
- has_default_value=False, default_value=0,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR),
- ],
- extensions=[
- ],
- nested_types=[],
- enum_types=[
- ],
- serialized_options=None,
- is_extendable=False,
- syntax='proto3',
- extension_ranges=[],
- oneofs=[
- ],
- serialized_start=114,
- serialized_end=162,
-)
-
-DESCRIPTOR.message_types_by_name['Invoke'] = _INVOKE
-DESCRIPTOR.message_types_by_name['InvokeCount'] = _INVOKECOUNT
-DESCRIPTOR.message_types_by_name['InvokeResult'] = _INVOKERESULT
-_sym_db.RegisterFileDescriptor(DESCRIPTOR)
-
-Invoke = _reflection.GeneratedProtocolMessageType('Invoke',
(_message.Message,), dict(
- DESCRIPTOR = _INVOKE,
- __module__ = 'remote_module_verification_pb2'
- #
@@protoc_insertion_point(class_scope:org.apache.flink.statefun.e2e.remote.Invoke)
- ))
-_sym_db.RegisterMessage(Invoke)
-
-InvokeCount = _reflection.GeneratedProtocolMessageType('InvokeCount',
(_message.Message,), dict(
- DESCRIPTOR = _INVOKECOUNT,
- __module__ = 'remote_module_verification_pb2'
- #
@@protoc_insertion_point(class_scope:org.apache.flink.statefun.e2e.remote.InvokeCount)
- ))
-_sym_db.RegisterMessage(InvokeCount)
-
-InvokeResult = _reflection.GeneratedProtocolMessageType('InvokeResult',
(_message.Message,), dict(
- DESCRIPTOR = _INVOKERESULT,
- __module__ = 'remote_module_verification_pb2'
- #
@@protoc_insertion_point(class_scope:org.apache.flink.statefun.e2e.remote.InvokeResult)
- ))
-_sym_db.RegisterMessage(InvokeResult)
-
-
-DESCRIPTOR._options = None
-# @@protoc_insertion_point(module_scope)
diff --git
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/ExactlyOnceWithRemoteFnE2E.java
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/ExactlyOnceWithRemoteFnE2E.java
deleted file mode 100644
index 091c1fa..0000000
---
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/ExactlyOnceWithRemoteFnE2E.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.remote;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.Random;
-import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
-import
org.apache.flink.statefun.e2e.remote.generated.RemoteModuleVerification.Invoke;
-import
org.apache.flink.statefun.e2e.remote.generated.RemoteModuleVerification.InvokeResult;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.KafkaContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.images.builder.ImageFromDockerfile;
-
-/**
- * Exactly-once end-to-end test with a completely YAML-based remote module
setup.
- *
- * <p>The setup consists of a auto-routable YAML Kafka ingress, the generic
YAML Kafka egress, and
- * two Python remote functions: 1) a simple invocation counter function, which
gets routed invoke
- * messages from the auto-routable Kafka ingress, and 2) a simple stateless
forwarding. function,
- * which gets the invocation counts from the counter function and simply
forwards them to the Kafka
- * egress.
- *
- * <p>We perform the extra stateless forwarding so that the E2E test scenario
covers messaging
- * between remote functions.
- *
- * <p>After the first series of output is seen in the Kafka egress (which
implies some checkpoints
- * have been completed since the verification application is using
exactly-once delivery), we
- * restart a StateFun worker to simulate failure. The application should
automatically attempt to
- * recover and eventually restart. Meanwhile, more records are written to
Kafka again. We verify
- * that on the consumer side, the invocation counts increase sequentially for
each key as if the
- * failure did not occur.
- */
-public class ExactlyOnceWithRemoteFnE2E {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ExactlyOnceWithRemoteFnE2E.class);
-
- private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
-
- private static final String KAFKA_HOST = "kafka-broker";
- private static final String INVOKE_TOPIC = "invoke";
- private static final String INVOKE_RESULTS_TOPIC = "invoke-results";
-
- private static final String REMOTE_FUNCTION_HOST = "remote-function";
-
- private static final int NUM_WORKERS = 2;
-
- @Rule
- public KafkaContainer kafka =
- new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
- .withNetworkAliases(KAFKA_HOST)
- .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
- .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
-
- @Rule
- public GenericContainer<?> remoteFunction =
- new GenericContainer<>(remoteFunctionImage())
- .withNetworkAliases(REMOTE_FUNCTION_HOST)
- .withLogConsumer(new Slf4jLogConsumer(LOG));
-
- @Rule
- public StatefulFunctionsAppContainers verificationApp =
- StatefulFunctionsAppContainers.builder("remote-module-verification",
NUM_WORKERS)
- .dependsOn(kafka)
- .dependsOn(remoteFunction)
- .exposeLogs(LOG)
- .withBuildContextFileFromClasspath("remote-module",
"/remote-module/")
- .build();
-
- @Test(timeout = 1000 * 60 * 10)
- public void run() {
- final String kafkaAddress = kafka.getBootstrapServers();
-
- final Producer<String, Invoke> invokeProducer =
kafkaKeyedInvokesProducer(kafkaAddress);
- final Consumer<String, InvokeResult> invokeResultConsumer =
- kafkaInvokeResultsConsumer(kafkaAddress);
-
- final KafkaIOVerifier<String, Invoke, String, InvokeResult> verifier =
- new KafkaIOVerifier<>(invokeProducer, invokeResultConsumer);
-
- // we verify results come in any order, since the results from the counter
function are
- // being forwarded to the forwarding function with a random key, and
therefore
- // might be written to Kafka out-of-order. We specifically use random keys
there
- // so that the E2E may cover both local handovers and cross-partition
messaging via the
- // feedback loop in the remote module setup.
- assertThat(
- verifier.sending(invoke("foo"), invoke("foo"), invoke("bar")),
- verifier.resultsInAnyOrder(
- is(invokeResult("foo", 1)), is(invokeResult("foo", 2)),
is(invokeResult("bar", 1))));
-
- LOG.info(
- "Restarting random worker to simulate failure. The application should
automatically recover.");
- verificationApp.restartWorker(randomWorkerIndex());
-
- assertThat(
- verifier.sending(invoke("foo"), invoke("foo"), invoke("bar")),
- verifier.resultsInAnyOrder(
- is(invokeResult("foo", 3)), is(invokeResult("foo", 4)),
is(invokeResult("bar", 2))));
- }
-
- private static ImageFromDockerfile remoteFunctionImage() {
- final Path pythonSourcePath = remoteFunctionPythonSourcePath();
- LOG.info("Building remote function image with Python source at: {}",
pythonSourcePath);
-
- final Path pythonSdkPath = pythonSdkPath();
- LOG.info("Located built Python SDK at: {}", pythonSdkPath);
-
- return new ImageFromDockerfile("remote-function")
- .withFileFromClasspath("Dockerfile", "Dockerfile.remote-function")
- .withFileFromPath("source/", pythonSourcePath)
- .withFileFromClasspath("requirements.txt", "requirements.txt")
- .withFileFromPath("python-sdk/", pythonSdkPath);
- }
-
- private static Path remoteFunctionPythonSourcePath() {
- return Paths.get(System.getProperty("user.dir") + "/src/main/python");
- }
-
- private static Path pythonSdkPath() {
- return Paths.get(System.getProperty("user.dir") +
"/../../statefun-sdk-python/dist");
- }
-
- private static Producer<String, Invoke> kafkaKeyedInvokesProducer(String
bootstrapServers) {
- Properties props = new Properties();
- props.put("bootstrap.servers", bootstrapServers);
-
- return new KafkaProducer<>(
- props, new StringSerializer(), new
KafkaProtobufSerializer<>(Invoke.parser()));
- }
-
- private Consumer<String, InvokeResult> kafkaInvokeResultsConsumer(String
bootstrapServers) {
- Properties consumerProps = new Properties();
- consumerProps.setProperty("bootstrap.servers", bootstrapServers);
- consumerProps.setProperty("group.id", "remote-module-e2e");
- consumerProps.setProperty("auto.offset.reset", "earliest");
- consumerProps.setProperty("isolation.level", "read_committed");
-
- KafkaConsumer<String, InvokeResult> consumer =
- new KafkaConsumer<>(
- consumerProps,
- new StringDeserializer(),
- new KafkaProtobufSerializer<>(InvokeResult.parser()));
- consumer.subscribe(Collections.singletonList(INVOKE_RESULTS_TOPIC));
-
- return consumer;
- }
-
- private static ProducerRecord<String, Invoke> invoke(String target) {
- return new ProducerRecord<>(INVOKE_TOPIC, target,
Invoke.getDefaultInstance());
- }
-
- private static InvokeResult invokeResult(String id, int invokeCount) {
- return
InvokeResult.newBuilder().setId(id).setInvokeCount(invokeCount).build();
- }
-
- private static int randomWorkerIndex() {
- return new Random().nextInt(NUM_WORKERS);
- }
-}
diff --git
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/Dockerfile
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/Dockerfile
deleted file mode 100644
index 9803a48..0000000
---
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/Dockerfile
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-FROM flink-statefun:3.1-SNAPSHOT
-
-RUN mkdir -p /opt/statefun/modules/statefun-remote-module-e2e
-COPY remote-module/ /opt/statefun/modules/statefun-remote-module-e2e/
-COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/Dockerfile.remote-function
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/Dockerfile.remote-function
deleted file mode 100644
index f013414..0000000
---
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/Dockerfile.remote-function
+++ /dev/null
@@ -1,34 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-FROM python:3.8-alpine
-
-RUN mkdir -p /app
-WORKDIR /app
-
-COPY python-sdk/apache_flink_statefun-*-py3-none-any.whl /app
-RUN pip install apache_flink_statefun-*-py3-none-any.whl
-
-COPY requirements.txt /app
-RUN pip install -r requirements.txt
-
-COPY source/functions.py /app
-COPY source/remote_module_verification_pb2.py /app
-
-EXPOSE 8000
-
-CMD ["gunicorn", "-b", "0.0.0.0:8000", "-w 1", "functions:app"]
-
diff --git
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/log4j.properties
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/log4j.properties
deleted file mode 100644
index fb965d3..0000000
---
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x
- %m%n
diff --git
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
deleted file mode 100644
index eb00af7..0000000
---
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-kind: io.statefun.endpoints.v2/http
-spec:
- functions: org.apache.flink.statefun.e2e.remote/*
- urlPathTemplate: http://remote-function:8000/service
- maxNumBatchRequests: 10000
----
-kind: io.statefun.kafka.v1/ingress
-spec:
- id: org.apache.flink.statefun.e2e.remote/invoke
- address: kafka-broker:9092
- consumerGroupId: remote-module-e2e
- startupPosition:
- type: earliest
- topics:
- - topic: invoke
- valueType: statefun.e2e/org.apache.flink.statefun.e2e.remote.Invoke
- targets:
- - org.apache.flink.statefun.e2e.remote/counter
----
-kind: io.statefun.kafka.v1/egress
-spec:
- id: org.apache.flink.statefun.e2e.remote/invoke-results
- address: kafka-broker:9092
- deliverySemantic:
- type: exactly-once
- transactionTimeout: 15min
diff --git
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/requirements.txt
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/requirements.txt
deleted file mode 100644
index 2b89e19..0000000
---
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/requirements.txt
+++ /dev/null
@@ -1,21 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-flask==1.1.1
-gunicorn==20.0.4
-
-
-
diff --git a/statefun-e2e-tests/statefun-sanity-e2e/pom.xml
b/statefun-e2e-tests/statefun-sanity-e2e/pom.xml
deleted file mode 100644
index 94fe52d..0000000
--- a/statefun-e2e-tests/statefun-sanity-e2e/pom.xml
+++ /dev/null
@@ -1,106 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://maven.apache.org/POM/4.0.0"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>statefun-e2e-tests</artifactId>
- <groupId>org.apache.flink</groupId>
- <version>3.1-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>statefun-sanity-e2e</artifactId>
-
- <dependencies>
- <!-- Stateful Functions -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>statefun-sdk-embedded</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>statefun-kafka-io</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <!-- Protobuf -->
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>${protobuf.version}</version>
- </dependency>
-
- <!-- logging -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.15</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.17</version>
- <scope>test</scope>
- </dependency>
-
- <!-- End-to-end test common -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>statefun-e2e-tests-common</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- Testcontainers KafkaContainer -->
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>kafka</artifactId>
- <version>${testcontainers.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- Flink Config -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>com.github.os72</groupId>
- <artifactId>protoc-jar-maven-plugin</artifactId>
- <version>${protoc-jar-maven-plugin.version}</version>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <configuration>
-
<excludePackageNames>org.apache.flink.statefun.examples.greeter.generated</excludePackageNames>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java
b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java
deleted file mode 100644
index 2f868e9..0000000
---
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.sanity;
-
-import
org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.Command;
-import
org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.StateSnapshot;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.io.EgressIdentifier;
-import org.apache.flink.statefun.sdk.io.IngressIdentifier;
-
-final class Constants {
-
- private Constants() {}
-
- static final String KAFKA_BOOTSTRAP_SERVERS_CONF = "kafka-bootstrap-servers";
-
- static final IngressIdentifier<Command> COMMAND_INGRESS_ID =
- new IngressIdentifier<>(Command.class, "org.apache.flink.e2e.sanity",
"commands");
- static final EgressIdentifier<StateSnapshot> STATE_SNAPSHOT_EGRESS_ID =
- new EgressIdentifier<>("org.apache.flink.e2e.sanity", "state-snapshots",
StateSnapshot.class);
-
- static final FunctionType[] FUNCTION_TYPES =
- new FunctionType[] {
- new FunctionType("org.apache.flink.e2e.sanity", "t0"),
- new FunctionType("org.apache.flink.e2e.sanity", "t1")
- };
-}
diff --git
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/FnCommandResolver.java
b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/FnCommandResolver.java
deleted file mode 100644
index b83830e..0000000
---
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/FnCommandResolver.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.sanity;
-
-import org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages;
-import org.apache.flink.statefun.sdk.Address;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
-import org.apache.flink.statefun.sdk.state.PersistedValue;
-
-/**
- * Simple stateful function that performs actions according to the command
received.
- *
- * <ul>
- * <li>{@link VerificationMessages.Noop} command: does not do anything.
- * <li>{@link VerificationMessages.Send} command: sends a specified command
to another function.
- * <li>{@link VerificationMessages.Modify} command: increments state value
by a specified amount,
- * and then reflects the new state value to an egress as a {@link
- * VerificationMessages.StateSnapshot} message.
- * </ul>
- */
-public final class FnCommandResolver extends StatefulMatchFunction {
-
- /** Represents the {@link FunctionType} that this function is bound to. */
- private final int fnTypeIndex;
-
- FnCommandResolver(int fnTypeIndex) {
- this.fnTypeIndex = fnTypeIndex;
- }
-
- @Persisted
- private final PersistedValue<Integer> state = PersistedValue.of("state",
Integer.class);
-
- @Override
- public void configure(MatchBinder binder) {
- binder
- .predicate(
- VerificationMessages.Command.class,
VerificationMessages.Command::hasNoop, this::noop)
- .predicate(
- VerificationMessages.Command.class,
VerificationMessages.Command::hasSend, this::send)
- .predicate(
- VerificationMessages.Command.class,
- VerificationMessages.Command::hasModify,
- this::modify);
- }
-
- private void send(Context context, VerificationMessages.Command command) {
- for (VerificationMessages.Command send :
command.getSend().getCommandToSendList()) {
- Address to = Utils.toSdkAddress(send.getTarget());
- context.send(to, send);
- }
- }
-
- private void modify(Context context, VerificationMessages.Command command) {
- VerificationMessages.Modify modify = command.getModify();
-
- final int nextState =
- state.updateAndGet(old -> old == null ? modify.getDelta() : old +
modify.getDelta());
-
- // reflect state changes to egress
- final VerificationMessages.FnAddress self = selfFnAddress(context);
- final VerificationMessages.StateSnapshot result =
-
VerificationMessages.StateSnapshot.newBuilder().setFrom(self).setState(nextState).build();
-
- context.send(Constants.STATE_SNAPSHOT_EGRESS_ID, result);
- }
-
- @SuppressWarnings("unused")
- private void noop(Context context, Object ignored) {
- // nothing to do
- }
-
- private VerificationMessages.FnAddress selfFnAddress(Context context) {
- return VerificationMessages.FnAddress.newBuilder()
- .setType(fnTypeIndex)
- .setId(context.self().id())
- .build();
- }
-}
diff --git
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/KafkaIO.java
b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/KafkaIO.java
deleted file mode 100644
index 503e40f..0000000
---
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/KafkaIO.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.sanity;
-
-import java.util.Objects;
-import org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages;
-import org.apache.flink.statefun.sdk.io.EgressSpec;
-import org.apache.flink.statefun.sdk.io.IngressSpec;
-import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
-import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
-import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
-import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
-import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-final class KafkaIO {
-
- static final String COMMAND_TOPIC_NAME = "commands";
- static final String STATE_SNAPSHOTS_TOPIC_NAME = "state-snapshots";
-
- private final String kafkaAddress;
-
- KafkaIO(String kafkaAddress) {
- this.kafkaAddress = Objects.requireNonNull(kafkaAddress);
- }
-
- IngressSpec<VerificationMessages.Command> getIngressSpec() {
- return KafkaIngressBuilder.forIdentifier(Constants.COMMAND_INGRESS_ID)
- .withKafkaAddress(kafkaAddress)
- .withConsumerGroupId("sanity-itcase")
- .withTopic(COMMAND_TOPIC_NAME)
- .withDeserializer(CommandKafkaDeserializer.class)
- .withStartupPosition(KafkaIngressStartupPosition.fromEarliest())
- .build();
- }
-
- EgressSpec<VerificationMessages.StateSnapshot> getEgressSpec() {
- return KafkaEgressBuilder.forIdentifier(Constants.STATE_SNAPSHOT_EGRESS_ID)
- .withKafkaAddress(kafkaAddress)
- .withSerializer(StateSnapshotKafkaSerializer.class)
- .build();
- }
-
- private static final class CommandKafkaDeserializer
- implements KafkaIngressDeserializer<VerificationMessages.Command> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public VerificationMessages.Command deserialize(ConsumerRecord<byte[],
byte[]> input) {
- try {
- return VerificationMessages.Command.parseFrom(input.value());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- private static final class StateSnapshotKafkaSerializer
- implements KafkaEgressSerializer<VerificationMessages.StateSnapshot> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public ProducerRecord<byte[], byte[]> serialize(
- VerificationMessages.StateSnapshot stateSnapshot) {
- final byte[] key = stateSnapshot.getFrom().toByteArray();
- final byte[] value = stateSnapshot.toByteArray();
-
- return new ProducerRecord<>(STATE_SNAPSHOTS_TOPIC_NAME, key, value);
- }
- }
-}
diff --git
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java
b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java
deleted file mode 100644
index 519d1c4..0000000
---
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.sanity;
-
-import static org.apache.flink.statefun.e2e.sanity.Utils.toSdkAddress;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import org.apache.flink.statefun.sdk.Address;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
-
-/**
- * A simple application used for sanity verification.
- *
- * <p>The application reads commands from a Kafka ingress, binds multiple
functions that reacts to
- * the commands (see class-level Javadoc) of {@link SanityVerificationModule}
for a full description
- * on the set of commands), and reflects any state updates in the functions
back to a Kafka egress.
- */
-@AutoService(StatefulFunctionModule.class)
-public class SanityVerificationModule implements StatefulFunctionModule {
-
- @Override
- public void configure(Map<String, String> globalConfiguration, Binder
binder) {
- String kafkaBootstrapServers =
globalConfiguration.get(Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
- if (kafkaBootstrapServers == null) {
- throw new IllegalStateException(
- "Missing required global configuration " +
Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
- }
-
- configureKafkaIO(kafkaBootstrapServers, binder);
- configureCommandRouter(binder);
- configureCommandResolverFunctions(binder);
- }
-
- private static void configureKafkaIO(String kafkaAddress, Binder binder) {
- final KafkaIO kafkaIO = new KafkaIO(kafkaAddress);
- binder.bindIngress(kafkaIO.getIngressSpec());
- binder.bindEgress(kafkaIO.getEgressSpec());
- }
-
- private static void configureCommandRouter(Binder binder) {
- binder.bindIngressRouter(
- Constants.COMMAND_INGRESS_ID,
- (command, downstream) -> {
- Address target = toSdkAddress(command.getTarget());
- downstream.forward(target, command);
- });
- }
-
- private static void configureCommandResolverFunctions(Binder binder) {
- int index = 0;
- for (FunctionType functionType : Constants.FUNCTION_TYPES) {
- final int typeIndex = index;
- binder.bindFunctionProvider(functionType, ignored -> new
FnCommandResolver(typeIndex));
- index++;
- }
- }
-}
diff --git
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Utils.java
b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Utils.java
deleted file mode 100644
index aec4a04..0000000
---
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Utils.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.sanity;
-
-import org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages;
-import org.apache.flink.statefun.sdk.Address;
-import org.apache.flink.statefun.sdk.FunctionType;
-
-final class Utils {
-
- private Utils() {}
-
- static Address toSdkAddress(VerificationMessages.FnAddress target) {
- FunctionType targetType = Constants.FUNCTION_TYPES[target.getType()];
- return new Address(targetType, target.getId());
- }
-}
diff --git
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/protobuf/verification-messages.proto
b/statefun-e2e-tests/statefun-sanity-e2e/src/main/protobuf/verification-messages.proto
deleted file mode 100644
index 66e6561..0000000
---
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/protobuf/verification-messages.proto
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-syntax = "proto3";
-
-package org.apache.flink.statefun.e2e.sanity;
-option java_package = "org.apache.flink.statefun.e2e.sanity.generated";
-option java_multiple_files = false;
-
-/*
- * A command is addressed to a specific target funcction and triggers some
action by that target.
- * Commands can be nested to an arbitrary depth.
- */
-message Command {
- FnAddress target = 1;
-
- oneof kind {
- Noop noop = 2;
- Send send = 3;
- Modify modify = 4;
- }
-}
-
-/*
- * Sent by functions to egress after modifying its state.
- */
-message StateSnapshot {
- FnAddress from = 1;
- int32 state = 2;
-}
-
-/*
- * Target function address of commands.
- */
-message FnAddress {
- int32 type = 1;
- string id = 2;
-}
-
-/*
- * Modify the function's state by adding @delta to it's state.
- */
-message Modify {
- int32 delta = 1;
-}
-
-/*
- * Send 1 or more commands to different recipients.
- */
-message Send {
- repeated Command commandToSend = 2;
-}
-
-/*
- * A dummy command, does not require any action by the recipient.
- */
-message Noop {
-}
diff --git
a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
deleted file mode 100644
index d9ae8be..0000000
---
a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.sanity;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-import java.util.Collections;
-import java.util.Properties;
-import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
-import
org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.Command;
-import
org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.FnAddress;
-import
org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.Modify;
-import
org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.Noop;
-import
org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.Send;
-import
org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.StateSnapshot;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
-
-/**
- * Sanity verification end-to-end test based on the {@link
SanityVerificationModule} application.
- *
- * <p>The test setups Kafka brokers and the verification application using
Docker, sends a few
- * commands to Kafka to be consumed by the application, and finally verifies
that outputs sent to
- * Kafka from the application are correct.
- */
-public class SanityVerificationE2E {
-
- private static final Logger LOG =
LoggerFactory.getLogger(SanityVerificationE2E.class);
-
- private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
- private static final String KAFKA_HOST = "kafka-broker";
-
- @Rule
- public KafkaContainer kafka =
- new
KafkaContainer(CONFLUENT_PLATFORM_VERSION).withNetworkAliases(KAFKA_HOST);
-
- @Rule
- public StatefulFunctionsAppContainers verificationApp =
- StatefulFunctionsAppContainers.builder("sanity-verification", 2)
- .dependsOn(kafka)
- .exposeLogs(LOG)
- .withModuleGlobalConfiguration(
- Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
- .build();
-
- @Test(timeout = 60_000L)
- public void run() throws Exception {
- final String kafkaAddress = kafka.getBootstrapServers();
-
- final Producer<FnAddress, Command> commandProducer =
kafkaCommandProducer(kafkaAddress);
- final Consumer<FnAddress, StateSnapshot> stateSnapshotConsumer =
- kafkaStateSnapshotConsumer(kafkaAddress);
-
- final KafkaIOVerifier<FnAddress, Command, FnAddress, StateSnapshot>
verifier =
- new KafkaIOVerifier<>(commandProducer, stateSnapshotConsumer);
-
- assertThat(
- verifier.sending(
- producerRecord(modifyAction(fnAddress(0, "id-1"), 100)),
- producerRecord(modifyAction(fnAddress(0, "id-2"), 300)),
- producerRecord(modifyAction(fnAddress(1, "id-3"), 200)),
- producerRecord(
- sendAction(fnAddress(1, "id-2"), modifyAction(fnAddress(0,
"id-2"), 50))),
- producerRecord(sendAction(fnAddress(0, "id-1"),
noOpAction(fnAddress(1, "id-1"))))),
- verifier.resultsInOrder(
- is(stateSnapshot(fnAddress(0, "id-1"), 100)),
- is(stateSnapshot(fnAddress(0, "id-2"), 300)),
- is(stateSnapshot(fnAddress(1, "id-3"), 200)),
- is(stateSnapshot(fnAddress(0, "id-2"), 350))));
- }
-
- //
=================================================================================
- // Kafka IO utility methods
- //
=================================================================================
-
- private static Producer<FnAddress, Command> kafkaCommandProducer(String
bootstrapServers) {
- Properties props = new Properties();
- props.put("bootstrap.servers", bootstrapServers);
-
- return new KafkaProducer<>(
- props,
- new KafkaProtobufSerializer<>(FnAddress.parser()),
- new KafkaProtobufSerializer<>(Command.parser()));
- }
-
- private static Consumer<FnAddress, StateSnapshot> kafkaStateSnapshotConsumer(
- String bootstrapServers) {
- Properties consumerProps = new Properties();
- consumerProps.setProperty("bootstrap.servers", bootstrapServers);
- consumerProps.setProperty("group.id", "sanity-itcase");
- consumerProps.setProperty("auto.offset.reset", "earliest");
-
- KafkaConsumer<FnAddress, StateSnapshot> consumer =
- new KafkaConsumer<>(
- consumerProps,
- new KafkaProtobufSerializer<>(FnAddress.parser()),
- new KafkaProtobufSerializer<>(StateSnapshot.parser()));
-
consumer.subscribe(Collections.singletonList(KafkaIO.STATE_SNAPSHOTS_TOPIC_NAME));
-
- return consumer;
- }
-
- private static ProducerRecord<FnAddress, Command> producerRecord(Command
command) {
- return new ProducerRecord<>(KafkaIO.COMMAND_TOPIC_NAME,
command.getTarget(), command);
- }
-
- //
=================================================================================
- // Protobuf message building utilities
- //
=================================================================================
-
- private static StateSnapshot stateSnapshot(FnAddress fromFnAddress, int
stateSnapshotValue) {
- return
StateSnapshot.newBuilder().setFrom(fromFnAddress).setState(stateSnapshotValue).build();
- }
-
- private static Command sendAction(FnAddress targetAddress, Command
commandToSend) {
- final Send sendAction =
Send.newBuilder().addCommandToSend(commandToSend).build();
-
- return
Command.newBuilder().setTarget(targetAddress).setSend(sendAction).build();
- }
-
- private static Command modifyAction(FnAddress targetAddress, int
stateValueDelta) {
- final Modify modifyAction =
Modify.newBuilder().setDelta(stateValueDelta).build();
-
- return
Command.newBuilder().setTarget(targetAddress).setModify(modifyAction).build();
- }
-
- private static Command noOpAction(FnAddress targetAddress) {
- return
Command.newBuilder().setTarget(targetAddress).setNoop(Noop.getDefaultInstance()).build();
- }
-
- private static FnAddress fnAddress(int typeIndex, String fnId) {
- if (typeIndex > Constants.FUNCTION_TYPES.length - 1) {
- throw new IndexOutOfBoundsException(
- "Type index is out of bounds. Max index: " +
(Constants.FUNCTION_TYPES.length - 1));
- }
- return FnAddress.newBuilder().setType(typeIndex).setId(fnId).build();
- }
-}
diff --git
a/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/Dockerfile
b/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/Dockerfile
deleted file mode 100644
index fb58374..0000000
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/Dockerfile
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-FROM flink-statefun:3.1-SNAPSHOT
-
-RUN mkdir -p /opt/statefun/modules/statefun-sanity-e2e
-COPY statefun-sanity-e2e*.jar /opt/statefun/modules/statefun-sanity-e2e/
-COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git
a/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/log4j.properties
b/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/log4j.properties
deleted file mode 100644
index fb965d3..0000000
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x
- %m%n