Igal Shilman created FLINK-22584:
------------------------------------

             Summary: Use protobuf-shaded in StateFun core.
                 Key: FLINK-22584
                 URL: https://issues.apache.org/jira/browse/FLINK-22584
             Project: Flink
          Issue Type: Improvement
          Components: Stateful Functions
            Reporter: Igal Shilman


We have *statefun-protobuf-shaded* module, that is used by the remote java sdk,

we can use it to shade protobuf internally.

The major hurdle we need to overcome is that, in embedded functions, we have to 
be able to accept instances of protobuf generated messages by the user.

For example:
{code:java}
UserProfile userProfile = UserProfile.newBilder().build();
context.send(..., userProfile) {code}
If we will simply use the shaded Protobuf version, we will get immediately a 
class cast exception.

One way to overcome this is to use reflection and find the well known methods 
on the generated classes and call toBytes() / parseFrom() reflectively.

This however will cause a significant slow down, even by using MethodHandles.
A small experiment that I've previously done with ByteBuddy mitigates this, by 
generating 
accessors, in pre-flight:

{code:java}
package org.apache.flink.statefun.flink.common.protobuf.serde;

import static net.bytebuddy.matcher.ElementMatchers.named;import 
java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.DynamicType;
import net.bytebuddy.implementation.FixedValue;
import net.bytebuddy.implementation.MethodCall;
import net.bytebuddy.implementation.bytecode.assign.Assigner;final class 
ReflectiveProtobufSerde {  @SuppressWarnings({"unchecked", "rawtypes"})
  static <M> ProtobufSerde<M> ofProtobufGeneratedType(Class<M> type) {
    try {
      DynamicType.Unloaded<ProtobufSerde> unloaded = configureByteBuddy(type);  
    Class<? extends ProtobufSerde> writer = 
unloaded.load(type.getClassLoader()).getLoaded();      return 
(ProtobufSerde<M>) writer.getDeclaredConstructor().newInstance();
    } catch (Throwable e) {
      throw new IllegalArgumentException();
    }
  }  @SuppressWarnings("rawtypes")
  private static DynamicType.Unloaded<ProtobufSerde> 
configureByteBuddy(Class<?> type)
      throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
    Method writeToMethod = type.getMethod("writeTo", OutputStream.class);
    Method parseFromMethod = type.getMethod("parseFrom", InputStream.class);
    Method getSerializedSizeMethod = type.getMethod("getSerializedSize");    // 
get the message full name
    Method getDescriptorMethod = type.getMethod("getDescriptor");
    Object descriptor = getDescriptorMethod.invoke(null);
    Method getFullNameMethod = descriptor.getClass().getMethod("getFullName");
    String messageFullName = (String) getFullNameMethod.invoke(descriptor);    
return new ByteBuddy()
        .subclass(ProtobufSerde.class)
        .typeVariable("M", type)
        .method(named("writeTo"))
        .intercept(
            MethodCall.invoke(writeToMethod)
                .onArgument(0)
                .withArgument(1)
                .withAssigner(Assigner.DEFAULT, Assigner.Typing.DYNAMIC))
        .method(named("parseFrom"))
        .intercept(MethodCall.invoke(parseFromMethod).withArgument(0))
        .method(named("getSerializedSize"))
        .intercept(
            MethodCall.invoke(getSerializedSizeMethod)
                .onArgument(0)
                .withAssigner(Assigner.DEFAULT, Assigner.Typing.DYNAMIC))
        .method(named("getMessageFullName"))
        .intercept(FixedValue.value(messageFullName))
        .make();
  }
}
 {code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to