[
https://issues.apache.org/jira/browse/FLINK-25443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479307#comment-17479307
]
Ashok commented on FLINK-25443:
-------------------------------
Thanks Igal.I am trying to create a stateful function .The ingest data is from
kafka .
I am able to compile the code ,But when i run in flink cluster i am getting the
above error .I am attaching the code
package p3;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import static
org.apache.flink.statefun.flink.datastream.RequestReplyFunctionBuilder.requestReplyFunctionBuilder;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.Logger;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.json.JsonMapper;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.statefun.flink.core.message.RoutableMessage;
import org.apache.flink.statefun.flink.core.message.RoutableMessageBuilder;
import org.apache.flink.statefun.flink.datastream.RequestReplyFunctionBuilder;
import
org.apache.flink.statefun.flink.datastream.StatefulFunctionDataStreamBuilder;
import org.apache.flink.statefun.flink.datastream.StatefulFunctionEgressStreams;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.annotations.Persisted;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.state.PersistedValue;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.net.URI;
import java.time.Duration;
public class stf {
static String TOPIC_IN = "quickstart-events";
static String BOOTSTRAP_SERVER = "localhost:9092";
public static FunctionType GREET = new FunctionType("example", "greet");
public static void main(String[] args) throws Exception {
ObjectMapper mapper = new ObjectMapper();
mapper.findAndRegisterModules();
FunctionType REMOTE_GREET = new FunctionType("example", "remote-greet");
EgressIdentifier<String> GREETINGS =
new EgressIdentifier<>("example", "greetings", String.class);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source =
KafkaSource.<String>builder()
.setBootstrapServers(BOOTSTRAP_SERVER)
.setTopics(TOPIC_IN)
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// DataStream<String> stream =env.socketTextStream("localhost",8182);
stream.print();
/* DataStream<RoutableMessage> namesIngress =
stream.map(
name ->
RoutableMessageBuilder.builder()
.withTargetAddress(new FunctionType("example", "greet"), name)
.withMessageBody(name)
.build());
*/
StatefulFunctionsConfig statefunConfig =
StatefulFunctionsConfig.fromEnvironment(env);
statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
DataStream<RoutableMessage> namesIngress =
stream.map(
name ->
RoutableMessageBuilder.builder()
.withTargetAddress(GREET, name)
.withMessageBody(name)
.build());
/* StatefulFunctionEgressStreams egresses =
StatefulFunctionDataStreamBuilder.builder("example")
.withDataStreamAsIngress(namesIngress)
.withRequestReplyRemoteFunction(
RequestReplyFunctionBuilder.requestReplyFunctionBuilder(
REMOTE_GREET, URI.create("http://localhost:5000/statefun"))
.withMaxNumBatchRequests(500))
.withEgressId(GREETINGS)
.build(env);
*/
StatefulFunctionEgressStreams out =
StatefulFunctionDataStreamBuilder.builder("example")
.withDataStreamAsIngress(namesIngress)
.withFunctionProvider(GREET, unused -> new MyFunction())
.withRequestReplyRemoteFunction(
requestReplyFunctionBuilder(
REMOTE_GREET, URI.create("http://localhost:5000/statefun"))
.withMaxRequestDuration(Duration.ofSeconds(15)).withReadTimeout(Duration.ofSeconds(10))
.withMaxNumBatchRequests(500) )
.withEgressId(GREETINGS)
.build(env);
env.execute("Flink FCD Consumer");
}
private static final class MyFunction implements StatefulFunction {
@Persisted
private final PersistedValue<Integer> seenCount = PersistedValue.of("seen",
Integer.class);
@Override
public void invoke(Context context, Object input) {
int seen = seenCount.updateAndGet(MyFunction::increment);
System.out.println("MyFunction: " + input.toString());
context.send(GREET, input.toString(), input);
}
private static int increment(@Nullable Integer n) {
return n == null ? 1 : n + 1;
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>dec1</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-flink-datastream</artifactId>
<version>3.1.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-sdk-embedded</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-parameter-names</artifactId>
<scope>compile</scope>
</dependency>
<!-- Java 8 Date/time -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<scope>compile</scope>
</dependency>
<!-- Java 8 Datatypes -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
> Embedded statefulfunction jakson /java8 date time issue
> -------------------------------------------------------
>
> Key: FLINK-25443
> URL: https://issues.apache.org/jira/browse/FLINK-25443
> Project: Flink
> Issue Type: Bug
> Components: Stateful Functions
> Affects Versions: shaded-14.0
> Reporter: Ashok
> Priority: Major
>
> Hi
> I have the jackson dependency in the pom.xml.But getting following error .
> <dependency>
> <groupId>com.fasterxml.jackson.datatype</groupId>
> <artifactId>jackson-datatype-jsr310</artifactId>
> <version>2.13.1</version>
> <scope>provided</scope>
> </dependency>
>
> Caused by:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.InvalidDefinitionException:
> Java 8 date/time type `java.time.Duration` not supported by default: add
> Module
> "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
> to enable handling (through reference chain:
> org.apache.flink.statefun.flink.core.httpfn.DefaultHttpRequestReplyClientSpec["timeouts"]->org.apache.flink.statefun.flink.core.httpfn.DefaultHttpRequestReplyClientSpec$Timeouts["call"])
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1276)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.UnsupportedTypeSerializer.serialize(UnsupportedTypeSerializer.java:35)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3126)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree(ObjectMapper.java:3307)
--
This message was sent by Atlassian Jira
(v8.20.1#820001)