slinkydeveloper commented on a change in pull request #18352:
URL: https://github.com/apache/flink/pull/18352#discussion_r784104335
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java
##########
@@ -243,6 +244,36 @@ public static void prepareInstance(ReadableConfig config,
UserDefinedFunction fu
cleanFunction(config, function);
}
+ /**
+ * Returns whether a {@link UserDefinedFunction} can be easily serialized
and identified by only
+ * a fully qualified class name. It must have a default constructor and no
serializable fields.
+ */
+ public static boolean isClassNameSerializable(UserDefinedFunction
function) {
+ final Class<?> functionClass = function.getClass();
+ if (!InstantiationUtil.hasPublicNullaryConstructor(functionClass)) {
+ // function must be parameterized
+ return false;
+ }
+ Class<?> currentClass = functionClass;
+ while (!currentClass.equals(UserDefinedFunction.class)) {
+ for (Field field : currentClass.getDeclaredFields()) {
+ if (!Modifier.isTransient(field.getModifiers())
+ && !Modifier.isStatic(field.getModifiers())) {
+ // function seems to be stateful
+ return false;
+ }
+ }
+ currentClass = currentClass.getSuperclass();
+ }
Review comment:
That sounds like a very magic way to find out whether a function is
stateful or not. Don't we have a marker interface for that?
##########
File path:
flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java
##########
@@ -143,34 +162,27 @@ private void testErrorMessage() {
@Nullable String expectedErrorMessage;
- TestSpec(Class<? extends UserDefinedFunction> functionClass) {
+ TestSpec(
+ @Nullable Class<? extends UserDefinedFunction> functionClass,
+ @Nullable UserDefinedFunction functionInstance,
+ @Nullable CatalogFunction catalogFunction) {
this.functionClass = functionClass;
- this.functionInstance = null;
- this.catalogFunction = null;
- }
-
- TestSpec(UserDefinedFunction functionInstance) {
- this.functionClass = null;
this.functionInstance = functionInstance;
- this.catalogFunction = null;
- }
-
- TestSpec(CatalogFunction catalogFunction) {
- this.functionClass = null;
- this.functionInstance = null;
this.catalogFunction = catalogFunction;
}
static TestSpec forClass(Class<? extends UserDefinedFunction>
function) {
- return new TestSpec(function);
+ return new TestSpec(function, null, null);
}
static TestSpec forInstance(UserDefinedFunction function) {
- return new TestSpec(function);
+ return new TestSpec(null, function, null);
}
- static TestSpec forCatalogFunction(String className, FunctionLanguage
language) {
- return new TestSpec(new CatalogFunctionMock(className, language));
+ static TestSpec forCatalogFunction(
+ String className,
+ @SuppressWarnings("SameParameterValue") FunctionLanguage
language) {
Review comment:
What warning is this one?
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
##########
@@ -58,9 +60,13 @@
/** Returns a unique, serialized representation for this function. */
public final String functionIdentifier() {
+ final String className = getClass().getName();
+ if (isClassNameSerializable(this)) {
+ return className;
+ }
final String md5 =
EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)));
- return getClass().getName().replace('.', '$').concat("$").concat(md5);
+ return className.concat("$").concat(md5);
Review comment:
Does this logic works when the function is implemented as a scala
`object`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]