[ https://issues.apache.org/jira/browse/FLINK-25866 ]
Fil Karnicki deleted comment on FLINK-25866:
--------------------------------------
was (Author: JIRAUSER284249):
Hi, I tried to hack something together at home which would show me being able
to get my hands on a resource in a .jar inside a TaskManager, but I was
unsuccessful. Does any of the below strike you as a possible reason? I tried to
emulate our multi-tenant cloudera setup. The steps I took were:
1. created a flink and a kafka cluster using docker-compose
{code:java}
version: "2.2"
services:
jobmanager:
image: flink:1.13.5-scala_2.12-java8
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
classloader.parent-first-patterns.additional:
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
jobmanager.rpc.address: jobmanager taskmanager:
image: flink:1.13.5-scala_2.12-java8
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
classloader.parent-first-patterns.additional:
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2 zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS:
PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1{code}
2. added the below code to statefun-flink-core's
org.apache.flink.statefun.flink.core.nettyclient.NettyClient#from
{code:java}
System.out.println("FINDME " + getResourceContentForTest("inCore.txt"));{code}
where getResourceContentForTest returns the contents of the resource wrapped in
some text depending on the success/failure to get my hands on the resource
{code:java}
public static String getResourceContentForTest(String resourceName) {
StringBuilder stringBuilder = new StringBuilder("resource:
").append(resourceName);
try(InputStream inputStream =
NettyClient.class.getClassLoader().getResourceAsStream(resourceName)){
if (inputStream == null) {
stringBuilder.append("inputStream is null for ").append(resourceName);
} else {
String text = new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.UTF_8))
.lines()
.collect(Collectors.joining("\n"));
stringBuilder.append("inputStream is ").append(text).append(". nice");
}
} catch (IOException e) {
stringBuilder.append(e.getMessage());
}
return stringBuilder.toString();
} {code}
3. put "inCore.txt" in the resources of statefun-flink-core and built the
entire project
4. ran a random test and evaluated in intellij:
{code:java}
NettyClient.getResourceContentForTest("inCore.txt") {code}
which resulted in
{code:java}
resource: inCore.txtinputStream is Hello from inCore.txt. nice {code}
5. created an uber-jar with a module.yaml of
{code:java}
kind: io.statefun.endpoints.v2/http
spec:
functions: com.example.fns/*
urlPathTemplate: https://bar.foo.com:8080/{function.name}
transport:
type: io.statefun.transports.v1/async
timeouts:
call: 1 min
read: 10 sec
write: 10 sec
---
kind: io.statefun.kafka.v1/ingress
spec:
id: com.example/users
address: kafka:9092
consumerGroupId: my-consumer-group
startupPosition:
type: latest
topics:
- topic: messages-1
valueType: com.example/User
targets:
- com.example.fns/greeter {code}
and a pom of
{code:java}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId>
<artifactId>statefun-jobby</artifactId>
<version>3.1</version> <properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.5</flink.version>
<statefun.version>3.1.1</statefun.version>
<scala.binary.version>2.12</scala.binary.version>
</properties> <dependencies>
<!-- 3rd party -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.7.1</version>
</dependency> <!-- Stateful Functions sdk -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-sdk-embedded</artifactId>
<version>${statefun.version}</version>
</dependency> <!-- statefun-flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-flink-io</artifactId>
<version>${statefun.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-flink-io-bundle</artifactId>
<version>${statefun.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-flink-core</artifactId>
<version>${statefun.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-flink-launcher</artifactId>
<version>${statefun.version}</version>
</dependency> <!-- flink runtime metrics -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
</dependency> <!-- flink runtime is always provided -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- We use the maven-shade plugin to create a fat jar that
contains all necessary dependencies. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the
META-INF folder.
Otherwise, this might cause
SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
<filter>
<artifact>org.apache.kafka:*</artifact>
<excludes>
<exclude>kafka/kafka-version.properties</exclude>
<exclude>LICENSE</exclude>
<!-- Does not contain anything relevant.
Cites a binary dependency on
jersey, but this is neither reflected in the
dependency graph, nor are any
jersey files bundled. -->
<exclude>NOTICE</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>
org.apache.flink.statefun.flink.core.StatefulFunctionsJob
</mainClass>
</transformer>
<!-- required to aggregate all the
META-INF/services files -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<!-- remove all duplicate licenses -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
</transformer>
<!-- explicitly include our LICENSE file,
located at project root dir -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/LICENSE</resource>
<file>${basedir}/../../LICENSE</file>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
<projectName>Apache Flink Stateful
Functions (flink-statefun)</projectName>
<encoding>UTF-8</encoding>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> {code}
6. replaced only NettyClient.class in the uber-jar with my hacked core
NettyClient class from 2)
{code:java}
jar uf statefun-jobby-3.1.jar
org\apache\flink\statefun\flink\core\nettyclient\NettyClient.class {code}
7. uploaded the uber-jar to my flink cluster from 1) and pushed a message onto
kafka
{code:java}
~/kafka/kafka_2.12-3.1.0/bin$ ./kafka-console-producer.sh --topic messages-1
--bootstrap-server localhost:29092 --property parse.key=true --property
"key.separator=:"
>hi:there {code}
and the logs in the TaskManager showed
{code:java}
2022-01-31 22:15:03,855 INFO [some kafka log]
FINDME resource: inCore.txtinputStream is null for inCore.txt
2022-01-31 22:15:16,057 INFO
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction [] -
Bootstrapping function FunctionType(com.example.fns, greeter). Blocking
processing until first request is completed. Successive requests will be
performed asynchronously. {code}
Many, many thanks
Fil
> Support additional TLS configuration.
> -------------------------------------
>
> Key: FLINK-25866
> URL: https://issues.apache.org/jira/browse/FLINK-25866
> Project: Flink
> Issue Type: Improvement
> Components: Stateful Functions
> Reporter: Igal Shilman
> Priority: Major
>
> Currently the default HTTP client used to invoke remote functions does not
> support customising the TLS settings as part of the endpoint spec definition.
> This includes
> using self-signed certificates, and providing client side certificates for
> authentication (which is a slightly different requirement).
> This issue is about including additional TLS settings to the default endpoint
> resource definition, and supporting them in statefun-core.
> User mailing list threads:
> * [client cert auth in remote
> function|https://lists.apache.org/thread/97nw245kxqp32qglwfynhhgyhgp2pxvg]
> * [endpoint self-signed certificate
> problem|https://lists.apache.org/thread/y2m2bpwg4n71rxfont6pgky2t8m19n7w]
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)