This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git


The following commit(s) were added to refs/heads/master by this push:
     new 6300f62  [FLINK-23709] Remove SanityVerificationE2E and 
ExactlyOnceRemoteE2E
6300f62 is described below

commit 6300f629dc84d0b15684d903e17ebcbd581282e5
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Tue Aug 10 20:37:48 2021 +0800

    [FLINK-23709] Remove SanityVerificationE2E and ExactlyOnceRemoteE2E
    
    This closes #249.
---
 statefun-e2e-tests/pom.xml                         |   2 -
 .../statefun-exactly-once-remote-e2e/pom.xml       | 104 -----------
 .../main/protobuf/remote-module-verification.proto |  32 ----
 .../src/main/python/functions.py                   |  95 ----------
 .../main/python/remote_module_verification_pb2.py  | 149 ----------------
 .../e2e/remote/ExactlyOnceWithRemoteFnE2E.java     | 194 ---------------------
 .../src/test/resources/Dockerfile                  |  20 ---
 .../src/test/resources/Dockerfile.remote-function  |  34 ----
 .../src/test/resources/log4j.properties            |  24 ---
 .../src/test/resources/remote-module/module.yaml   |  41 -----
 .../src/test/resources/requirements.txt            |  21 ---
 statefun-e2e-tests/statefun-sanity-e2e/pom.xml     | 106 -----------
 .../flink/statefun/e2e/sanity/Constants.java       |  43 -----
 .../statefun/e2e/sanity/FnCommandResolver.java     |  98 -----------
 .../apache/flink/statefun/e2e/sanity/KafkaIO.java  |  90 ----------
 .../e2e/sanity/SanityVerificationModule.java       |  75 --------
 .../apache/flink/statefun/e2e/sanity/Utils.java    |  33 ----
 .../src/main/protobuf/verification-messages.proto  |  73 --------
 .../statefun/e2e/sanity/SanityVerificationE2E.java | 165 ------------------
 .../src/test/resources/Dockerfile                  |  20 ---
 .../src/test/resources/log4j.properties            |  24 ---
 21 files changed, 1443 deletions(-)

diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index 9a3dae9..d00e6df 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -34,8 +34,6 @@ under the License.
 
     <modules>
         <module>statefun-e2e-tests-common</module>
-        <module>statefun-sanity-e2e</module>
-        <module>statefun-exactly-once-remote-e2e</module>
         <module>statefun-smoke-e2e-common</module>
         <module>statefun-smoke-e2e-driver</module>
         <module>statefun-smoke-e2e-embedded</module>
diff --git a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/pom.xml 
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/pom.xml
deleted file mode 100644
index 9f6d5ff..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/pom.xml
+++ /dev/null
@@ -1,104 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xmlns="http://maven.apache.org/POM/4.0.0";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <parent>
-        <artifactId>statefun-e2e-tests</artifactId>
-        <groupId>org.apache.flink</groupId>
-        <version>3.1-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>statefun-exactly-once-remote-e2e</artifactId>
-
-    <properties>
-        <kafka.version>2.2.0</kafka.version>
-    </properties>
-
-    <dependencies>
-        <!-- Kafka clients -->
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>${kafka.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-api</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <!-- Protobuf -->
-        <dependency>
-            <groupId>com.google.protobuf</groupId>
-            <artifactId>protobuf-java</artifactId>
-            <version>${protobuf.version}</version>
-        </dependency>
-
-        <!-- logging -->
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>1.7.15</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-            <version>1.2.17</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- End-to-end test common -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-e2e-tests-common</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- Testcontainers KafkaContainer -->
-        <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>kafka</artifactId>
-            <version>${testcontainers.version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>com.github.os72</groupId>
-                <artifactId>protoc-jar-maven-plugin</artifactId>
-                <version>${protoc-jar-maven-plugin.version}</version>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-javadoc-plugin</artifactId>
-                <configuration>
-                    
<excludePackageNames>org.apache.flink.statefun.examples.greeter.generated</excludePackageNames>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
diff --git 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/protobuf/remote-module-verification.proto
 
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/protobuf/remote-module-verification.proto
deleted file mode 100644
index 3e877ee..0000000
--- 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/protobuf/remote-module-verification.proto
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-syntax = "proto3";
-
-package org.apache.flink.statefun.e2e.remote;
-option java_package = "org.apache.flink.statefun.e2e.remote.generated";
-option java_multiple_files = false;
-
-message Invoke {
-}
-
-
-message InvokeResult {
-    string id = 1;
-    int32 invoke_count = 2;
-}
diff --git 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/python/functions.py
 
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/python/functions.py
deleted file mode 100644
index 0c9f017..0000000
--- 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/python/functions.py
+++ /dev/null
@@ -1,95 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-import uuid
-from statefun import *
-
-from remote_module_verification_pb2 import Invoke, InvokeResult
-
-InvokeType = make_protobuf_type(namespace="statefun.e2e", cls=Invoke)
-InvokeResultType = make_protobuf_type(namespace="statefun.e2e", 
cls=InvokeResult)
-
-functions = StatefulFunctions()
-
-
[email protected](
-    typename="org.apache.flink.statefun.e2e.remote/counter",
-    specs=[ValueSpec(name='invoke_count', type=IntType)])
-def counter(context, message):
-    """
-    Keeps count of the number of invocations, and forwards that count
-    to be sent to the Kafka egress. We do the extra forwarding instead
-    of directly sending to Kafka, so that we cover inter-function
-    messaging in our E2E test.
-    """
-    n = context.storage.invoke_count or 0
-    n += 1
-    context.storage.invoke_count = n
-
-    response = InvokeResult()
-    response.id = context.address.id
-    response.invoke_count = n
-
-    context.send(
-        
message_builder(target_typename="org.apache.flink.statefun.e2e.remote/forward-function",
-                        # use random keys to simulate both local handovers and
-                        # cross-partition messaging via the feedback loop
-                        target_id=uuid.uuid4().hex,
-                        value=response,
-                        value_type=InvokeResultType))
-
-
[email protected]("org.apache.flink.statefun.e2e.remote/forward-function")
-def forward_to_egress(context, message):
-    """
-    Simply forwards the results to the Kafka egress.
-    """
-    invoke_result = message.as_type(InvokeResultType)
-
-    egress_message = kafka_egress_message(
-        typename="org.apache.flink.statefun.e2e.remote/invoke-results",
-        topic="invoke-results",
-        key=invoke_result.id,
-        value=invoke_result,
-        value_type=InvokeResultType)
-    context.send_egress(egress_message)
-
-
-handler = RequestReplyHandler(functions)
-
-#
-# Serve the endpoint
-#
-
-from flask import request
-from flask import make_response
-from flask import Flask
-
-app = Flask(__name__)
-
-
[email protected]('/service', methods=['POST'])
-def handle():
-    response_data = handler.handle_sync(request.data)
-    response = make_response(response_data)
-    response.headers.set('Content-Type', 'application/octet-stream')
-    return response
-
-
-if __name__ == "__main__":
-    app.run()
diff --git 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/python/remote_module_verification_pb2.py
 
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/python/remote_module_verification_pb2.py
deleted file mode 100644
index 8e0a556..0000000
--- 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/main/python/remote_module_verification_pb2.py
+++ /dev/null
@@ -1,149 +0,0 @@
-# -*- coding: utf-8 -*-
-# Generated by the protocol buffer compiler.  DO NOT EDIT!
-# source: remote-module-verification.proto
-
-import sys
-_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
-from google.protobuf import descriptor as _descriptor
-from google.protobuf import message as _message
-from google.protobuf import reflection as _reflection
-from google.protobuf import symbol_database as _symbol_database
-# @@protoc_insertion_point(imports)
-
-_sym_db = _symbol_database.Default()
-
-
-
-
-DESCRIPTOR = _descriptor.FileDescriptor(
-  name='remote-module-verification.proto',
-  package='org.apache.flink.statefun.e2e.remote',
-  syntax='proto3',
-  
serialized_options=_b('\n.org.apache.flink.statefun.e2e.remote.generatedP\000'),
-  serialized_pb=_b('\n 
remote-module-verification.proto\x12$org.apache.flink.statefun.e2e.remote\"\x08\n\x06Invoke\"\x1c\n\x0bInvokeCount\x12\r\n\x05\x63ount\x18\x01
 \x01(\x05\"0\n\x0cInvokeResult\x12\n\n\x02id\x18\x01 
\x01(\t\x12\x14\n\x0cinvoke_count\x18\x02 
\x01(\x05\x42\x32\n.org.apache.flink.statefun.e2e.remote.generatedP\x00\x62\x06proto3')
-)
-
-
-
-
-_INVOKE = _descriptor.Descriptor(
-  name='Invoke',
-  full_name='org.apache.flink.statefun.e2e.remote.Invoke',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  serialized_options=None,
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=74,
-  serialized_end=82,
-)
-
-
-_INVOKECOUNT = _descriptor.Descriptor(
-  name='InvokeCount',
-  full_name='org.apache.flink.statefun.e2e.remote.InvokeCount',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='count', 
full_name='org.apache.flink.statefun.e2e.remote.InvokeCount.count', index=0,
-      number=1, type=5, cpp_type=1, label=1,
-      has_default_value=False, default_value=0,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  serialized_options=None,
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=84,
-  serialized_end=112,
-)
-
-
-_INVOKERESULT = _descriptor.Descriptor(
-  name='InvokeResult',
-  full_name='org.apache.flink.statefun.e2e.remote.InvokeResult',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='id', 
full_name='org.apache.flink.statefun.e2e.remote.InvokeResult.id', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='invoke_count', 
full_name='org.apache.flink.statefun.e2e.remote.InvokeResult.invoke_count', 
index=1,
-      number=2, type=5, cpp_type=1, label=1,
-      has_default_value=False, default_value=0,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  serialized_options=None,
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=114,
-  serialized_end=162,
-)
-
-DESCRIPTOR.message_types_by_name['Invoke'] = _INVOKE
-DESCRIPTOR.message_types_by_name['InvokeCount'] = _INVOKECOUNT
-DESCRIPTOR.message_types_by_name['InvokeResult'] = _INVOKERESULT
-_sym_db.RegisterFileDescriptor(DESCRIPTOR)
-
-Invoke = _reflection.GeneratedProtocolMessageType('Invoke', 
(_message.Message,), dict(
-  DESCRIPTOR = _INVOKE,
-  __module__ = 'remote_module_verification_pb2'
-  # 
@@protoc_insertion_point(class_scope:org.apache.flink.statefun.e2e.remote.Invoke)
-  ))
-_sym_db.RegisterMessage(Invoke)
-
-InvokeCount = _reflection.GeneratedProtocolMessageType('InvokeCount', 
(_message.Message,), dict(
-  DESCRIPTOR = _INVOKECOUNT,
-  __module__ = 'remote_module_verification_pb2'
-  # 
@@protoc_insertion_point(class_scope:org.apache.flink.statefun.e2e.remote.InvokeCount)
-  ))
-_sym_db.RegisterMessage(InvokeCount)
-
-InvokeResult = _reflection.GeneratedProtocolMessageType('InvokeResult', 
(_message.Message,), dict(
-  DESCRIPTOR = _INVOKERESULT,
-  __module__ = 'remote_module_verification_pb2'
-  # 
@@protoc_insertion_point(class_scope:org.apache.flink.statefun.e2e.remote.InvokeResult)
-  ))
-_sym_db.RegisterMessage(InvokeResult)
-
-
-DESCRIPTOR._options = None
-# @@protoc_insertion_point(module_scope)
diff --git 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/ExactlyOnceWithRemoteFnE2E.java
 
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/ExactlyOnceWithRemoteFnE2E.java
deleted file mode 100644
index 091c1fa..0000000
--- 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/ExactlyOnceWithRemoteFnE2E.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.remote;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.Random;
-import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
-import 
org.apache.flink.statefun.e2e.remote.generated.RemoteModuleVerification.Invoke;
-import 
org.apache.flink.statefun.e2e.remote.generated.RemoteModuleVerification.InvokeResult;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.KafkaContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.images.builder.ImageFromDockerfile;
-
-/**
- * Exactly-once end-to-end test with a completely YAML-based remote module 
setup.
- *
- * <p>The setup consists of a auto-routable YAML Kafka ingress, the generic 
YAML Kafka egress, and
- * two Python remote functions: 1) a simple invocation counter function, which 
gets routed invoke
- * messages from the auto-routable Kafka ingress, and 2) a simple stateless 
forwarding. function,
- * which gets the invocation counts from the counter function and simply 
forwards them to the Kafka
- * egress.
- *
- * <p>We perform the extra stateless forwarding so that the E2E test scenario 
covers messaging
- * between remote functions.
- *
- * <p>After the first series of output is seen in the Kafka egress (which 
implies some checkpoints
- * have been completed since the verification application is using 
exactly-once delivery), we
- * restart a StateFun worker to simulate failure. The application should 
automatically attempt to
- * recover and eventually restart. Meanwhile, more records are written to 
Kafka again. We verify
- * that on the consumer side, the invocation counts increase sequentially for 
each key as if the
- * failure did not occur.
- */
-public class ExactlyOnceWithRemoteFnE2E {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(ExactlyOnceWithRemoteFnE2E.class);
-
-  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
-
-  private static final String KAFKA_HOST = "kafka-broker";
-  private static final String INVOKE_TOPIC = "invoke";
-  private static final String INVOKE_RESULTS_TOPIC = "invoke-results";
-
-  private static final String REMOTE_FUNCTION_HOST = "remote-function";
-
-  private static final int NUM_WORKERS = 2;
-
-  @Rule
-  public KafkaContainer kafka =
-      new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
-          .withNetworkAliases(KAFKA_HOST)
-          .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
-          .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
-
-  @Rule
-  public GenericContainer<?> remoteFunction =
-      new GenericContainer<>(remoteFunctionImage())
-          .withNetworkAliases(REMOTE_FUNCTION_HOST)
-          .withLogConsumer(new Slf4jLogConsumer(LOG));
-
-  @Rule
-  public StatefulFunctionsAppContainers verificationApp =
-      StatefulFunctionsAppContainers.builder("remote-module-verification", 
NUM_WORKERS)
-          .dependsOn(kafka)
-          .dependsOn(remoteFunction)
-          .exposeLogs(LOG)
-          .withBuildContextFileFromClasspath("remote-module", 
"/remote-module/")
-          .build();
-
-  @Test(timeout = 1000 * 60 * 10)
-  public void run() {
-    final String kafkaAddress = kafka.getBootstrapServers();
-
-    final Producer<String, Invoke> invokeProducer = 
kafkaKeyedInvokesProducer(kafkaAddress);
-    final Consumer<String, InvokeResult> invokeResultConsumer =
-        kafkaInvokeResultsConsumer(kafkaAddress);
-
-    final KafkaIOVerifier<String, Invoke, String, InvokeResult> verifier =
-        new KafkaIOVerifier<>(invokeProducer, invokeResultConsumer);
-
-    // we verify results come in any order, since the results from the counter 
function are
-    // being forwarded to the forwarding function with a random key, and 
therefore
-    // might be written to Kafka out-of-order. We specifically use random keys 
there
-    // so that the E2E may cover both local handovers and cross-partition 
messaging via the
-    // feedback loop in the remote module setup.
-    assertThat(
-        verifier.sending(invoke("foo"), invoke("foo"), invoke("bar")),
-        verifier.resultsInAnyOrder(
-            is(invokeResult("foo", 1)), is(invokeResult("foo", 2)), 
is(invokeResult("bar", 1))));
-
-    LOG.info(
-        "Restarting random worker to simulate failure. The application should 
automatically recover.");
-    verificationApp.restartWorker(randomWorkerIndex());
-
-    assertThat(
-        verifier.sending(invoke("foo"), invoke("foo"), invoke("bar")),
-        verifier.resultsInAnyOrder(
-            is(invokeResult("foo", 3)), is(invokeResult("foo", 4)), 
is(invokeResult("bar", 2))));
-  }
-
-  private static ImageFromDockerfile remoteFunctionImage() {
-    final Path pythonSourcePath = remoteFunctionPythonSourcePath();
-    LOG.info("Building remote function image with Python source at: {}", 
pythonSourcePath);
-
-    final Path pythonSdkPath = pythonSdkPath();
-    LOG.info("Located built Python SDK at: {}", pythonSdkPath);
-
-    return new ImageFromDockerfile("remote-function")
-        .withFileFromClasspath("Dockerfile", "Dockerfile.remote-function")
-        .withFileFromPath("source/", pythonSourcePath)
-        .withFileFromClasspath("requirements.txt", "requirements.txt")
-        .withFileFromPath("python-sdk/", pythonSdkPath);
-  }
-
-  private static Path remoteFunctionPythonSourcePath() {
-    return Paths.get(System.getProperty("user.dir") + "/src/main/python");
-  }
-
-  private static Path pythonSdkPath() {
-    return Paths.get(System.getProperty("user.dir") + 
"/../../statefun-sdk-python/dist");
-  }
-
-  private static Producer<String, Invoke> kafkaKeyedInvokesProducer(String 
bootstrapServers) {
-    Properties props = new Properties();
-    props.put("bootstrap.servers", bootstrapServers);
-
-    return new KafkaProducer<>(
-        props, new StringSerializer(), new 
KafkaProtobufSerializer<>(Invoke.parser()));
-  }
-
-  private Consumer<String, InvokeResult> kafkaInvokeResultsConsumer(String 
bootstrapServers) {
-    Properties consumerProps = new Properties();
-    consumerProps.setProperty("bootstrap.servers", bootstrapServers);
-    consumerProps.setProperty("group.id", "remote-module-e2e");
-    consumerProps.setProperty("auto.offset.reset", "earliest");
-    consumerProps.setProperty("isolation.level", "read_committed");
-
-    KafkaConsumer<String, InvokeResult> consumer =
-        new KafkaConsumer<>(
-            consumerProps,
-            new StringDeserializer(),
-            new KafkaProtobufSerializer<>(InvokeResult.parser()));
-    consumer.subscribe(Collections.singletonList(INVOKE_RESULTS_TOPIC));
-
-    return consumer;
-  }
-
-  private static ProducerRecord<String, Invoke> invoke(String target) {
-    return new ProducerRecord<>(INVOKE_TOPIC, target, 
Invoke.getDefaultInstance());
-  }
-
-  private static InvokeResult invokeResult(String id, int invokeCount) {
-    return 
InvokeResult.newBuilder().setId(id).setInvokeCount(invokeCount).build();
-  }
-
-  private static int randomWorkerIndex() {
-    return new Random().nextInt(NUM_WORKERS);
-  }
-}
diff --git 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/Dockerfile
 
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/Dockerfile
deleted file mode 100644
index 9803a48..0000000
--- 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/Dockerfile
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-FROM flink-statefun:3.1-SNAPSHOT
-
-RUN mkdir -p /opt/statefun/modules/statefun-remote-module-e2e
-COPY remote-module/ /opt/statefun/modules/statefun-remote-module-e2e/
-COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/Dockerfile.remote-function
 
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/Dockerfile.remote-function
deleted file mode 100644
index f013414..0000000
--- 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/Dockerfile.remote-function
+++ /dev/null
@@ -1,34 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-FROM python:3.8-alpine
-
-RUN mkdir -p /app
-WORKDIR /app
-
-COPY python-sdk/apache_flink_statefun-*-py3-none-any.whl /app
-RUN pip install apache_flink_statefun-*-py3-none-any.whl
-
-COPY requirements.txt /app
-RUN pip install -r requirements.txt
-
-COPY source/functions.py /app
-COPY source/remote_module_verification_pb2.py /app
-
-EXPOSE 8000
-
-CMD ["gunicorn", "-b", "0.0.0.0:8000", "-w 1", "functions:app"]
-
diff --git 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/log4j.properties
 
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/log4j.properties
deleted file mode 100644
index fb965d3..0000000
--- 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
diff --git 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
 
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
deleted file mode 100644
index eb00af7..0000000
--- 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-kind: io.statefun.endpoints.v2/http
-spec:
-  functions: org.apache.flink.statefun.e2e.remote/*
-  urlPathTemplate: http://remote-function:8000/service
-  maxNumBatchRequests: 10000
----
-kind: io.statefun.kafka.v1/ingress
-spec:
-  id: org.apache.flink.statefun.e2e.remote/invoke
-  address: kafka-broker:9092
-  consumerGroupId: remote-module-e2e
-  startupPosition:
-    type: earliest
-  topics:
-    - topic: invoke
-      valueType: statefun.e2e/org.apache.flink.statefun.e2e.remote.Invoke
-      targets:
-        - org.apache.flink.statefun.e2e.remote/counter
----
-kind: io.statefun.kafka.v1/egress
-spec:
-  id: org.apache.flink.statefun.e2e.remote/invoke-results
-  address: kafka-broker:9092
-  deliverySemantic:
-    type: exactly-once
-    transactionTimeout: 15min
diff --git 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/requirements.txt
 
b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/requirements.txt
deleted file mode 100644
index 2b89e19..0000000
--- 
a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/requirements.txt
+++ /dev/null
@@ -1,21 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-flask==1.1.1
-gunicorn==20.0.4
-
-
-
diff --git a/statefun-e2e-tests/statefun-sanity-e2e/pom.xml 
b/statefun-e2e-tests/statefun-sanity-e2e/pom.xml
deleted file mode 100644
index 94fe52d..0000000
--- a/statefun-e2e-tests/statefun-sanity-e2e/pom.xml
+++ /dev/null
@@ -1,106 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xmlns="http://maven.apache.org/POM/4.0.0";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <parent>
-        <artifactId>statefun-e2e-tests</artifactId>
-        <groupId>org.apache.flink</groupId>
-        <version>3.1-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>statefun-sanity-e2e</artifactId>
-
-    <dependencies>
-        <!-- Stateful Functions -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-sdk-embedded</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-kafka-io</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <!-- Protobuf -->
-        <dependency>
-            <groupId>com.google.protobuf</groupId>
-            <artifactId>protobuf-java</artifactId>
-            <version>${protobuf.version}</version>
-        </dependency>
-
-        <!-- logging -->
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>1.7.15</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-            <version>1.2.17</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- End-to-end test common -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-e2e-tests-common</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- Testcontainers KafkaContainer -->
-        <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>kafka</artifactId>
-            <version>${testcontainers.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- Flink Config -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-core</artifactId>
-            <version>${flink.version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>com.github.os72</groupId>
-                <artifactId>protoc-jar-maven-plugin</artifactId>
-                <version>${protoc-jar-maven-plugin.version}</version>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-javadoc-plugin</artifactId>
-                <configuration>
-                    
<excludePackageNames>org.apache.flink.statefun.examples.greeter.generated</excludePackageNames>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
diff --git 
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java
 
b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java
deleted file mode 100644
index 2f868e9..0000000
--- 
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.sanity;
-
-import 
org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.Command;
-import 
org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.StateSnapshot;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.io.EgressIdentifier;
-import org.apache.flink.statefun.sdk.io.IngressIdentifier;
-
-final class Constants {
-
-  private Constants() {}
-
-  static final String KAFKA_BOOTSTRAP_SERVERS_CONF = "kafka-bootstrap-servers";
-
-  static final IngressIdentifier<Command> COMMAND_INGRESS_ID =
-      new IngressIdentifier<>(Command.class, "org.apache.flink.e2e.sanity", 
"commands");
-  static final EgressIdentifier<StateSnapshot> STATE_SNAPSHOT_EGRESS_ID =
-      new EgressIdentifier<>("org.apache.flink.e2e.sanity", "state-snapshots", 
StateSnapshot.class);
-
-  static final FunctionType[] FUNCTION_TYPES =
-      new FunctionType[] {
-        new FunctionType("org.apache.flink.e2e.sanity", "t0"),
-        new FunctionType("org.apache.flink.e2e.sanity", "t1")
-      };
-}
diff --git 
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/FnCommandResolver.java
 
b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/FnCommandResolver.java
deleted file mode 100644
index b83830e..0000000
--- 
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/FnCommandResolver.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.sanity;
-
-import org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages;
-import org.apache.flink.statefun.sdk.Address;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.match.MatchBinder;
-import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
-import org.apache.flink.statefun.sdk.state.PersistedValue;
-
-/**
- * Simple stateful function that performs actions according to the command 
received.
- *
- * <ul>
- *   <li>{@link VerificationMessages.Noop} command: does not do anything.
- *   <li>{@link VerificationMessages.Send} command: sends a specified command 
to another function.
- *   <li>{@link VerificationMessages.Modify} command: increments state value 
by a specified amount,
- *       and then reflects the new state value to an egress as a {@link
- *       VerificationMessages.StateSnapshot} message.
- * </ul>
- */
-public final class FnCommandResolver extends StatefulMatchFunction {
-
-  /** Represents the {@link FunctionType} that this function is bound to. */
-  private final int fnTypeIndex;
-
-  FnCommandResolver(int fnTypeIndex) {
-    this.fnTypeIndex = fnTypeIndex;
-  }
-
-  @Persisted
-  private final PersistedValue<Integer> state = PersistedValue.of("state", 
Integer.class);
-
-  @Override
-  public void configure(MatchBinder binder) {
-    binder
-        .predicate(
-            VerificationMessages.Command.class, 
VerificationMessages.Command::hasNoop, this::noop)
-        .predicate(
-            VerificationMessages.Command.class, 
VerificationMessages.Command::hasSend, this::send)
-        .predicate(
-            VerificationMessages.Command.class,
-            VerificationMessages.Command::hasModify,
-            this::modify);
-  }
-
-  private void send(Context context, VerificationMessages.Command command) {
-    for (VerificationMessages.Command send : 
command.getSend().getCommandToSendList()) {
-      Address to = Utils.toSdkAddress(send.getTarget());
-      context.send(to, send);
-    }
-  }
-
-  private void modify(Context context, VerificationMessages.Command command) {
-    VerificationMessages.Modify modify = command.getModify();
-
-    final int nextState =
-        state.updateAndGet(old -> old == null ? modify.getDelta() : old + 
modify.getDelta());
-
-    // reflect state changes to egress
-    final VerificationMessages.FnAddress self = selfFnAddress(context);
-    final VerificationMessages.StateSnapshot result =
-        
VerificationMessages.StateSnapshot.newBuilder().setFrom(self).setState(nextState).build();
-
-    context.send(Constants.STATE_SNAPSHOT_EGRESS_ID, result);
-  }
-
-  @SuppressWarnings("unused")
-  private void noop(Context context, Object ignored) {
-    // nothing to do
-  }
-
-  private VerificationMessages.FnAddress selfFnAddress(Context context) {
-    return VerificationMessages.FnAddress.newBuilder()
-        .setType(fnTypeIndex)
-        .setId(context.self().id())
-        .build();
-  }
-}
diff --git 
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/KafkaIO.java
 
b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/KafkaIO.java
deleted file mode 100644
index 503e40f..0000000
--- 
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/KafkaIO.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.sanity;
-
-import java.util.Objects;
-import org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages;
-import org.apache.flink.statefun.sdk.io.EgressSpec;
-import org.apache.flink.statefun.sdk.io.IngressSpec;
-import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
-import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
-import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
-import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
-import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-final class KafkaIO {
-
-  static final String COMMAND_TOPIC_NAME = "commands";
-  static final String STATE_SNAPSHOTS_TOPIC_NAME = "state-snapshots";
-
-  private final String kafkaAddress;
-
-  KafkaIO(String kafkaAddress) {
-    this.kafkaAddress = Objects.requireNonNull(kafkaAddress);
-  }
-
-  IngressSpec<VerificationMessages.Command> getIngressSpec() {
-    return KafkaIngressBuilder.forIdentifier(Constants.COMMAND_INGRESS_ID)
-        .withKafkaAddress(kafkaAddress)
-        .withConsumerGroupId("sanity-itcase")
-        .withTopic(COMMAND_TOPIC_NAME)
-        .withDeserializer(CommandKafkaDeserializer.class)
-        .withStartupPosition(KafkaIngressStartupPosition.fromEarliest())
-        .build();
-  }
-
-  EgressSpec<VerificationMessages.StateSnapshot> getEgressSpec() {
-    return KafkaEgressBuilder.forIdentifier(Constants.STATE_SNAPSHOT_EGRESS_ID)
-        .withKafkaAddress(kafkaAddress)
-        .withSerializer(StateSnapshotKafkaSerializer.class)
-        .build();
-  }
-
-  private static final class CommandKafkaDeserializer
-      implements KafkaIngressDeserializer<VerificationMessages.Command> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public VerificationMessages.Command deserialize(ConsumerRecord<byte[], 
byte[]> input) {
-      try {
-        return VerificationMessages.Command.parseFrom(input.value());
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  private static final class StateSnapshotKafkaSerializer
-      implements KafkaEgressSerializer<VerificationMessages.StateSnapshot> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public ProducerRecord<byte[], byte[]> serialize(
-        VerificationMessages.StateSnapshot stateSnapshot) {
-      final byte[] key = stateSnapshot.getFrom().toByteArray();
-      final byte[] value = stateSnapshot.toByteArray();
-
-      return new ProducerRecord<>(STATE_SNAPSHOTS_TOPIC_NAME, key, value);
-    }
-  }
-}
diff --git 
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java
 
b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java
deleted file mode 100644
index 519d1c4..0000000
--- 
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.sanity;
-
-import static org.apache.flink.statefun.e2e.sanity.Utils.toSdkAddress;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import org.apache.flink.statefun.sdk.Address;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
-
-/**
- * A simple application used for sanity verification.
- *
- * <p>The application reads commands from a Kafka ingress, binds multiple 
functions that reacts to
- * the commands (see class-level Javadoc) of {@link SanityVerificationModule} 
for a full description
- * on the set of commands), and reflects any state updates in the functions 
back to a Kafka egress.
- */
-@AutoService(StatefulFunctionModule.class)
-public class SanityVerificationModule implements StatefulFunctionModule {
-
-  @Override
-  public void configure(Map<String, String> globalConfiguration, Binder 
binder) {
-    String kafkaBootstrapServers = 
globalConfiguration.get(Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
-    if (kafkaBootstrapServers == null) {
-      throw new IllegalStateException(
-          "Missing required global configuration " + 
Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
-    }
-
-    configureKafkaIO(kafkaBootstrapServers, binder);
-    configureCommandRouter(binder);
-    configureCommandResolverFunctions(binder);
-  }
-
-  private static void configureKafkaIO(String kafkaAddress, Binder binder) {
-    final KafkaIO kafkaIO = new KafkaIO(kafkaAddress);
-    binder.bindIngress(kafkaIO.getIngressSpec());
-    binder.bindEgress(kafkaIO.getEgressSpec());
-  }
-
-  private static void configureCommandRouter(Binder binder) {
-    binder.bindIngressRouter(
-        Constants.COMMAND_INGRESS_ID,
-        (command, downstream) -> {
-          Address target = toSdkAddress(command.getTarget());
-          downstream.forward(target, command);
-        });
-  }
-
-  private static void configureCommandResolverFunctions(Binder binder) {
-    int index = 0;
-    for (FunctionType functionType : Constants.FUNCTION_TYPES) {
-      final int typeIndex = index;
-      binder.bindFunctionProvider(functionType, ignored -> new 
FnCommandResolver(typeIndex));
-      index++;
-    }
-  }
-}
diff --git 
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Utils.java
 
b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Utils.java
deleted file mode 100644
index aec4a04..0000000
--- 
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Utils.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.sanity;
-
-import org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages;
-import org.apache.flink.statefun.sdk.Address;
-import org.apache.flink.statefun.sdk.FunctionType;
-
-final class Utils {
-
-  private Utils() {}
-
-  static Address toSdkAddress(VerificationMessages.FnAddress target) {
-    FunctionType targetType = Constants.FUNCTION_TYPES[target.getType()];
-    return new Address(targetType, target.getId());
-  }
-}
diff --git 
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/protobuf/verification-messages.proto
 
b/statefun-e2e-tests/statefun-sanity-e2e/src/main/protobuf/verification-messages.proto
deleted file mode 100644
index 66e6561..0000000
--- 
a/statefun-e2e-tests/statefun-sanity-e2e/src/main/protobuf/verification-messages.proto
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-syntax = "proto3";
-
-package org.apache.flink.statefun.e2e.sanity;
-option java_package = "org.apache.flink.statefun.e2e.sanity.generated";
-option java_multiple_files = false;
-
-/*
- * A command is addressed to a specific target funcction and triggers some 
action by that target.
- * Commands can be nested to an arbitrary depth.
- */
-message Command {
-    FnAddress target = 1;
-
-    oneof kind {
-        Noop noop = 2;
-        Send send = 3;
-        Modify modify = 4;
-    }
-}
-
-/*
- * Sent by functions to egress after modifying its state.
- */
-message StateSnapshot {
-    FnAddress from = 1;
-    int32 state = 2;
-}
-
-/*
- * Target function address of commands.
- */
-message FnAddress {
-    int32 type = 1;
-    string id = 2;
-}
-
-/*
- * Modify the function's state by adding @delta to it's state.
- */
-message Modify {
-    int32 delta = 1;
-}
-
-/*
- * Send 1 or more commands to different recipients.
- */
-message Send {
-    repeated Command commandToSend = 2;
-}
-
-/*
- * A dummy command, does not require any action by the recipient.
- */
-message Noop {
-}
diff --git 
a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
 
b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
deleted file mode 100644
index d9ae8be..0000000
--- 
a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.sanity;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-import java.util.Collections;
-import java.util.Properties;
-import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
-import 
org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.Command;
-import 
org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.FnAddress;
-import 
org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.Modify;
-import 
org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.Noop;
-import 
org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.Send;
-import 
org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.StateSnapshot;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
-
-/**
- * Sanity verification end-to-end test based on the {@link 
SanityVerificationModule} application.
- *
- * <p>The test setups Kafka brokers and the verification application using 
Docker, sends a few
- * commands to Kafka to be consumed by the application, and finally verifies 
that outputs sent to
- * Kafka from the application are correct.
- */
-public class SanityVerificationE2E {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(SanityVerificationE2E.class);
-
-  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
-  private static final String KAFKA_HOST = "kafka-broker";
-
-  @Rule
-  public KafkaContainer kafka =
-      new 
KafkaContainer(CONFLUENT_PLATFORM_VERSION).withNetworkAliases(KAFKA_HOST);
-
-  @Rule
-  public StatefulFunctionsAppContainers verificationApp =
-      StatefulFunctionsAppContainers.builder("sanity-verification", 2)
-          .dependsOn(kafka)
-          .exposeLogs(LOG)
-          .withModuleGlobalConfiguration(
-              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
-          .build();
-
-  @Test(timeout = 60_000L)
-  public void run() throws Exception {
-    final String kafkaAddress = kafka.getBootstrapServers();
-
-    final Producer<FnAddress, Command> commandProducer = 
kafkaCommandProducer(kafkaAddress);
-    final Consumer<FnAddress, StateSnapshot> stateSnapshotConsumer =
-        kafkaStateSnapshotConsumer(kafkaAddress);
-
-    final KafkaIOVerifier<FnAddress, Command, FnAddress, StateSnapshot> 
verifier =
-        new KafkaIOVerifier<>(commandProducer, stateSnapshotConsumer);
-
-    assertThat(
-        verifier.sending(
-            producerRecord(modifyAction(fnAddress(0, "id-1"), 100)),
-            producerRecord(modifyAction(fnAddress(0, "id-2"), 300)),
-            producerRecord(modifyAction(fnAddress(1, "id-3"), 200)),
-            producerRecord(
-                sendAction(fnAddress(1, "id-2"), modifyAction(fnAddress(0, 
"id-2"), 50))),
-            producerRecord(sendAction(fnAddress(0, "id-1"), 
noOpAction(fnAddress(1, "id-1"))))),
-        verifier.resultsInOrder(
-            is(stateSnapshot(fnAddress(0, "id-1"), 100)),
-            is(stateSnapshot(fnAddress(0, "id-2"), 300)),
-            is(stateSnapshot(fnAddress(1, "id-3"), 200)),
-            is(stateSnapshot(fnAddress(0, "id-2"), 350))));
-  }
-
-  // 
=================================================================================
-  //  Kafka IO utility methods
-  // 
=================================================================================
-
-  private static Producer<FnAddress, Command> kafkaCommandProducer(String 
bootstrapServers) {
-    Properties props = new Properties();
-    props.put("bootstrap.servers", bootstrapServers);
-
-    return new KafkaProducer<>(
-        props,
-        new KafkaProtobufSerializer<>(FnAddress.parser()),
-        new KafkaProtobufSerializer<>(Command.parser()));
-  }
-
-  private static Consumer<FnAddress, StateSnapshot> kafkaStateSnapshotConsumer(
-      String bootstrapServers) {
-    Properties consumerProps = new Properties();
-    consumerProps.setProperty("bootstrap.servers", bootstrapServers);
-    consumerProps.setProperty("group.id", "sanity-itcase");
-    consumerProps.setProperty("auto.offset.reset", "earliest");
-
-    KafkaConsumer<FnAddress, StateSnapshot> consumer =
-        new KafkaConsumer<>(
-            consumerProps,
-            new KafkaProtobufSerializer<>(FnAddress.parser()),
-            new KafkaProtobufSerializer<>(StateSnapshot.parser()));
-    
consumer.subscribe(Collections.singletonList(KafkaIO.STATE_SNAPSHOTS_TOPIC_NAME));
-
-    return consumer;
-  }
-
-  private static ProducerRecord<FnAddress, Command> producerRecord(Command 
command) {
-    return new ProducerRecord<>(KafkaIO.COMMAND_TOPIC_NAME, 
command.getTarget(), command);
-  }
-
-  // 
=================================================================================
-  //  Protobuf message building utilities
-  // 
=================================================================================
-
-  private static StateSnapshot stateSnapshot(FnAddress fromFnAddress, int 
stateSnapshotValue) {
-    return 
StateSnapshot.newBuilder().setFrom(fromFnAddress).setState(stateSnapshotValue).build();
-  }
-
-  private static Command sendAction(FnAddress targetAddress, Command 
commandToSend) {
-    final Send sendAction = 
Send.newBuilder().addCommandToSend(commandToSend).build();
-
-    return 
Command.newBuilder().setTarget(targetAddress).setSend(sendAction).build();
-  }
-
-  private static Command modifyAction(FnAddress targetAddress, int 
stateValueDelta) {
-    final Modify modifyAction = 
Modify.newBuilder().setDelta(stateValueDelta).build();
-
-    return 
Command.newBuilder().setTarget(targetAddress).setModify(modifyAction).build();
-  }
-
-  private static Command noOpAction(FnAddress targetAddress) {
-    return 
Command.newBuilder().setTarget(targetAddress).setNoop(Noop.getDefaultInstance()).build();
-  }
-
-  private static FnAddress fnAddress(int typeIndex, String fnId) {
-    if (typeIndex > Constants.FUNCTION_TYPES.length - 1) {
-      throw new IndexOutOfBoundsException(
-          "Type index is out of bounds. Max index: " + 
(Constants.FUNCTION_TYPES.length - 1));
-    }
-    return FnAddress.newBuilder().setType(typeIndex).setId(fnId).build();
-  }
-}
diff --git 
a/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/Dockerfile 
b/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/Dockerfile
deleted file mode 100644
index fb58374..0000000
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/Dockerfile
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-FROM flink-statefun:3.1-SNAPSHOT
-
-RUN mkdir -p /opt/statefun/modules/statefun-sanity-e2e
-COPY statefun-sanity-e2e*.jar /opt/statefun/modules/statefun-sanity-e2e/
-COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git 
a/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/log4j.properties 
b/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/log4j.properties
deleted file mode 100644
index fb965d3..0000000
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n

Reply via email to