Hi, Please post to the Dev mailing list in English.
Best regards, Martijn On Tue, Dec 13, 2022 at 9:03 AM wangshuai <[email protected]> wrote: > > 在使用kafka自定义序列化时会导致对应的class无法加载的问题。通过分析,现有代码在使用AppClassLoader类加载器先加载了KafkaSerializerWrapper时,用户提交任务自己编写的类是无法通过AppClassLoader加载的,通过FlinkUserCodeClassLoader加载的话需要KafkaSerializerWrapper也是通过FlinkUserCodeClassLoader加载。 > public void open(InitializationContext context) throws Exception { > final ClassLoader userCodeClassLoader = > context.getUserCodeClassLoader().asClassLoader(); > try (TemporaryClassLoaderContext ignored = > TemporaryClassLoaderContext.of(userCodeClassLoader)) { > serializer = > InstantiationUtil.instantiate( > serializerClass.getName(), > Serializer.class, > getClass().getClassLoader()); // ?? 似乎应该如此 > Thread.currentThread().getContextClassLoader() > > if (serializer instanceof Configurable) { > ((Configurable) serializer).configure(config); > } else { > serializer.configure(config, isKey); > } > } catch (Exception e) { > throw new IOException("Failed to instantiate the serializer of class " + > serializer, e); > } > }
