[
https://issues.apache.org/jira/browse/FLINK-22584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-22584:
-----------------------------------
Labels: auto-deprioritized-major developer-experience (was:
developer-experience stale-major)
Priority: Minor (was: Major)
This issue was labeled "stale-major" 7 days ago and has not received any
updates so it is being deprioritized. If this ticket is actually Major, please
raise the priority and ask a committer to assign you the issue or revive the
public discussion.
> Use protobuf-shaded in StateFun core.
> -------------------------------------
>
> Key: FLINK-22584
> URL: https://issues.apache.org/jira/browse/FLINK-22584
> Project: Flink
> Issue Type: Improvement
> Components: Stateful Functions
> Reporter: Igal Shilman
> Priority: Minor
> Labels: auto-deprioritized-major, developer-experience
> Fix For: statefun-3.1.0
>
>
> We have *statefun-protobuf-shaded* module, that was introduced for the remote
> Java sdk.
> we can use it to shade protobuf internally, to reduce the dependency surface.
> The major hurdle we need to overcome is that, in embedded functions, we have
> to be able to accept instances of protobuf generated messages by the user.
> For example:
> {code:java}
> UserProfile userProfile = UserProfile.newBilder().build();
> context.send(..., userProfile) {code}
> If we will simply use the shaded Protobuf version, we will get immediately a
> class cast exception.
> One way to overcome this is to use reflection and find the well known methods
> on the generated classes and call toBytes() / parseFrom() reflectively.
> This however will cause a significant slow down, even by using MethodHandles.
> A small experiment that I've previously done with ByteBuddy mitigates this,
> by generating
> accessors, in pre-flight:
> {code:java}
> package org.apache.flink.statefun.flink.common.protobuf.serde;
> import static net.bytebuddy.matcher.ElementMatchers.named;import
> java.io.InputStream;
> import java.io.OutputStream;
> import java.lang.reflect.InvocationTargetException;
> import java.lang.reflect.Method;
> import net.bytebuddy.ByteBuddy;
> import net.bytebuddy.dynamic.DynamicType;
> import net.bytebuddy.implementation.FixedValue;
> import net.bytebuddy.implementation.MethodCall;
> import net.bytebuddy.implementation.bytecode.assign.Assigner;final class
> ReflectiveProtobufSerde { @SuppressWarnings({"unchecked", "rawtypes"})
> static <M> ProtobufSerde<M> ofProtobufGeneratedType(Class<M> type) {
> try {
> DynamicType.Unloaded<ProtobufSerde> unloaded =
> configureByteBuddy(type); Class<? extends ProtobufSerde> writer =
> unloaded.load(type.getClassLoader()).getLoaded(); return
> (ProtobufSerde<M>) writer.getDeclaredConstructor().newInstance();
> } catch (Throwable e) {
> throw new IllegalArgumentException();
> }
> } @SuppressWarnings("rawtypes")
> private static DynamicType.Unloaded<ProtobufSerde>
> configureByteBuddy(Class<?> type)
> throws NoSuchMethodException, InvocationTargetException,
> IllegalAccessException {
> Method writeToMethod = type.getMethod("writeTo", OutputStream.class);
> Method parseFromMethod = type.getMethod("parseFrom", InputStream.class);
> Method getSerializedSizeMethod = type.getMethod("getSerializedSize");
> // get the message full name
> Method getDescriptorMethod = type.getMethod("getDescriptor");
> Object descriptor = getDescriptorMethod.invoke(null);
> Method getFullNameMethod = descriptor.getClass().getMethod("getFullName");
> String messageFullName = (String) getFullNameMethod.invoke(descriptor);
> return new ByteBuddy()
> .subclass(ProtobufSerde.class)
> .typeVariable("M", type)
> .method(named("writeTo"))
> .intercept(
> MethodCall.invoke(writeToMethod)
> .onArgument(0)
> .withArgument(1)
> .withAssigner(Assigner.DEFAULT, Assigner.Typing.DYNAMIC))
> .method(named("parseFrom"))
> .intercept(MethodCall.invoke(parseFromMethod).withArgument(0))
> .method(named("getSerializedSize"))
> .intercept(
> MethodCall.invoke(getSerializedSizeMethod)
> .onArgument(0)
> .withAssigner(Assigner.DEFAULT, Assigner.Typing.DYNAMIC))
> .method(named("getMessageFullName"))
> .intercept(FixedValue.value(messageFullName))
> .make();
> }
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)