1996fanrui commented on code in PR #23550:
URL: https://github.com/apache/flink/pull/23550#discussion_r1371104358
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java:
##########
@@ -368,7 +373,8 @@ public void setStreamOperator(StreamOperator<?> operator) {
public void setStreamOperatorFactory(StreamOperatorFactory<?> factory) {
if (factory != null) {
- toBeSerializedConfigObjects.put(SERIALIZEDUDF, factory);
+ toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);
+ config.setString(SERIALIZED_UDF_CLASS_NAME,
factory.getClass().getName());
Review Comment:
Hi @pnowojski , thanks for your analysis!
> I would move:
SinkWriterOperatorFactory.class.getName().equals(streamOperatorFactoryClassName)
> check, into the boolean
StreamConfig#isSinkWriterOperatorFactory(Class<...> ...) method.
> It doesn't fit there very well, BUT at least it would justify why we have
the checkState in the
> StreamConfig#setStreamOperatorFactory.
The solution1
(`SinkWriterOperatorFactory.class.getName().equals(streamOperatorFactoryClassName)`
) is still fragile, right?
When `makes SinkWriterOperatorFactory non final and implement a subclass` in
the future, it still cannot support, and silently.
> toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);
config.setString(IS_INSTANCE_OF_SinkWriterOperatorFactory, factory instance
of SinkWriterOperatorFactory);
> would be better/cleaner. Either one is fine for me.
The solution2 is perfectly compatible with the case of adding sub class.
However, as I said before, `the getStreamOperatorFactory is called in the
toString to print the class name.`, and I'd like to using the
`SERIALIZED_UDF_CLASS_NAME` instead of `getStreamOperatorFactory`.
If we just keep `IS_INSTANCE_OF_SinkWriterOperatorFactory`, we must call
`getStreamOperatorFactory` in the `toString` method.
Or we add the `IS_INSTANCE_OF_SinkWriterOperatorFactory` and
`SERIALIZED_UDF_CLASS_NAME` together?
Actually, I have solution3 before I create this PR:
We store the `SERIALIZED_UDF_CLASS` instead of `SERIALIZED_UDF_CLASS_NAME`.
```
# setStreamOperatorFactory method
toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);
toBeSerializedConfigObjects.put(SERIALIZED_UDF_CLASS, factory.getClass());
```
```
public <T extends StreamOperatorFactory<?>> Class<T>
getStreamOperatorFactoryClass(ClassLoader cl) {
try {
return InstantiationUtil.readObjectFromConfig(this.config,
SERIALIZED_UDF_CLASS, cl);
} catch (Exception e) {
throw new StreamTaskException("Could not instantiate chained
outputs.", e);
}
}
```
And check `isAssignableFrom`:
```
SinkWriterOperatorFactory.class.isAssignableFrom(getStreamOperatorFactoryClass(SinkWriterOperatorFactory.class.getClassLoader()));
```
The solution3 is fine, however, I'm worried that when there are multiple
classloaders, the judgment may be wrong.
That's why this PR store the ClassName instead of Class.
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]