[ 
https://issues.apache.org/jira/browse/FLINK-25443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479307#comment-17479307
 ] 

Ashok commented on FLINK-25443:
-------------------------------

Thanks Igal.I am trying to create  a stateful function .The ingest data is from 
kafka .

I am able to compile the code ,But when i run in flink cluster i am getting the 
above error .I am attaching the code

 

package p3;


import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import static 
org.apache.flink.statefun.flink.datastream.RequestReplyFunctionBuilder.requestReplyFunctionBuilder;

import org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.Logger;
import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.json.JsonMapper;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.statefun.flink.core.message.RoutableMessage;
import org.apache.flink.statefun.flink.core.message.RoutableMessageBuilder;
import org.apache.flink.statefun.flink.datastream.RequestReplyFunctionBuilder;
import 
org.apache.flink.statefun.flink.datastream.StatefulFunctionDataStreamBuilder;
import org.apache.flink.statefun.flink.datastream.StatefulFunctionEgressStreams;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.annotations.Persisted;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.state.PersistedValue;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.checkerframework.checker.nullness.qual.Nullable;


import java.net.URI;
import java.time.Duration;

public class stf {
static String TOPIC_IN = "quickstart-events";
static String BOOTSTRAP_SERVER = "localhost:9092";
public static FunctionType GREET = new FunctionType("example", "greet");

public static void main(String[] args) throws Exception {

ObjectMapper mapper = new ObjectMapper();
mapper.findAndRegisterModules();

FunctionType REMOTE_GREET = new FunctionType("example", "remote-greet");
EgressIdentifier<String> GREETINGS =
new EgressIdentifier<>("example", "greetings", String.class);

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source =
KafkaSource.<String>builder()
.setBootstrapServers(BOOTSTRAP_SERVER)
.setTopics(TOPIC_IN)
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

DataStream<String> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// DataStream<String> stream =env.socketTextStream("localhost",8182);

stream.print();
/* DataStream<RoutableMessage> namesIngress =
stream.map(
name ->
RoutableMessageBuilder.builder()
.withTargetAddress(new FunctionType("example", "greet"), name)
.withMessageBody(name)
.build());

*/
StatefulFunctionsConfig statefunConfig = 
StatefulFunctionsConfig.fromEnvironment(env);
statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);


DataStream<RoutableMessage> namesIngress =
stream.map(
name ->
RoutableMessageBuilder.builder()
.withTargetAddress(GREET, name)
.withMessageBody(name)
.build());

/* StatefulFunctionEgressStreams egresses =
StatefulFunctionDataStreamBuilder.builder("example")
.withDataStreamAsIngress(namesIngress)
.withRequestReplyRemoteFunction(
RequestReplyFunctionBuilder.requestReplyFunctionBuilder(
REMOTE_GREET, URI.create("http://localhost:5000/statefun";))
.withMaxNumBatchRequests(500))
.withEgressId(GREETINGS)
.build(env);



*/

StatefulFunctionEgressStreams out =
StatefulFunctionDataStreamBuilder.builder("example")
.withDataStreamAsIngress(namesIngress)
.withFunctionProvider(GREET, unused -> new MyFunction())
.withRequestReplyRemoteFunction(
requestReplyFunctionBuilder(
REMOTE_GREET, URI.create("http://localhost:5000/statefun";))
.withMaxRequestDuration(Duration.ofSeconds(15)).withReadTimeout(Duration.ofSeconds(10))
.withMaxNumBatchRequests(500) )
.withEgressId(GREETINGS)
.build(env);

env.execute("Flink FCD Consumer");
}

private static final class MyFunction implements StatefulFunction {

@Persisted
private final PersistedValue<Integer> seenCount = PersistedValue.of("seen", 
Integer.class);

@Override
public void invoke(Context context, Object input) {
int seen = seenCount.updateAndGet(MyFunction::increment);
System.out.println("MyFunction: " + input.toString());
context.send(GREET, input.toString(), input);
}

private static int increment(@Nullable Integer n) {
return n == null ? 1 : n + 1;
}
}
}

 

 

 

 

 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0";
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>dec1</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-flink-datastream</artifactId>
<version>3.1.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-sdk-embedded</artifactId>
<version>3.1.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-parameter-names</artifactId>
<scope>compile</scope>
</dependency>

<!-- Java 8 Date/time -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<scope>compile</scope>
</dependency>

<!-- Java 8 Datatypes -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<scope>compile</scope>
</dependency>


</dependencies>

</project>

> Embedded statefulfunction jakson /java8 date time issue
> -------------------------------------------------------
>
>                 Key: FLINK-25443
>                 URL: https://issues.apache.org/jira/browse/FLINK-25443
>             Project: Flink
>          Issue Type: Bug
>          Components: Stateful Functions
>    Affects Versions: shaded-14.0
>            Reporter: Ashok
>            Priority: Major
>
> Hi
> I have the jackson dependency in the pom.xml.But getting following error .
> <dependency>
> <groupId>com.fasterxml.jackson.datatype</groupId>
> <artifactId>jackson-datatype-jsr310</artifactId>
> <version>2.13.1</version>
> <scope>provided</scope>
> </dependency>
>  
> Caused by: 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.InvalidDefinitionException:
>  Java 8 date/time type `java.time.Duration` not supported by default: add 
> Module 
> "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
>  to enable handling (through reference chain: 
> org.apache.flink.statefun.flink.core.httpfn.DefaultHttpRequestReplyClientSpec["timeouts"]->org.apache.flink.statefun.flink.core.httpfn.DefaultHttpRequestReplyClientSpec$Timeouts["call"])
>  at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
>  at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1276)
>  at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.UnsupportedTypeSerializer.serialize(UnsupportedTypeSerializer.java:35)
>  at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
>  at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
>  at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
>  at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
>  at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
>  at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
>  at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
>  at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
>  at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3126)
>  at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree(ObjectMapper.java:3307)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to