This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9a4fd227a5824e707f4dbe52773a260fbb89854a Author: Jark Wu <[email protected]> AuthorDate: Fri Jul 22 18:43:23 2022 +0800 [FLINK-28451][hive] Borrow Kryo from SerializationUtilities in HiveFunctionWrapper This closes #20211 --- .../factories/HiveFunctionDefinitionFactory.java | 4 +- .../table/functions/hive/HiveFunctionWrapper.java | 56 ++++++++++++++-------- .../apache/flink/table/module/hive/HiveModule.java | 2 +- .../client/gateway/context/ExecutionContext.java | 2 - 4 files changed, 40 insertions(+), 24 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java index 62458186f60..d3cb9f39763 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java @@ -71,9 +71,9 @@ public class HiveFunctionDefinitionFactory implements FunctionDefinitionFactory } /** - * Distinguish if the function is a generic function. + * Distinguish if the function is a Flink function. * - * @return whether the function is a generic function + * @return whether the function is a Flink function */ private boolean isFlinkFunction(CatalogFunction catalogFunction, ClassLoader classLoader) { if (catalogFunction.getFunctionLanguage() == FunctionLanguage.PYTHON) { diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java index eebd1e2153f..9c2d8053018 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java @@ -30,6 +30,9 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.Serializable; +import static org.apache.hadoop.hive.ql.exec.SerializationUtilities.borrowKryo; +import static org.apache.hadoop.hive.ql.exec.SerializationUtilities.releaseKryo; + /** * A wrapper of Hive functions that instantiate function instances and ser/de function instance * cross process boundary. @@ -50,6 +53,7 @@ public class HiveFunctionWrapper<UDFType> implements Serializable { private transient UDFType instance = null; + @SuppressWarnings("unchecked") public HiveFunctionWrapper(Class<?> functionClz) { this.functionClz = (Class<UDFType>) functionClz; } @@ -75,6 +79,38 @@ public class HiveFunctionWrapper<UDFType> implements Serializable { this.udfSerializedBytes = serializeObjectToKryo((Serializable) serializableInstance); } + private static byte[] serializeObjectToKryo(Serializable object) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Output output = new Output(baos); + Kryo kryo = borrowKryo(); + try { + kryo.writeObject(output, object); + } finally { + releaseKryo(kryo); + } + output.close(); + return baos.toByteArray(); + } + + private static <T extends Serializable> T deserializeObjectFromKryo( + byte[] bytes, Class<T> clazz) { + Input inp = new Input(new ByteArrayInputStream(bytes)); + Kryo kryo = borrowKryo(); + ClassLoader oldClassLoader = kryo.getClassLoader(); + kryo.setClassLoader(clazz.getClassLoader()); + T func; + + try { + func = kryo.readObject(inp, clazz); + } finally { + kryo.setClassLoader(oldClassLoader); + releaseKryo(kryo); + } + + inp.close(); + return func; + } + /** * Instantiate a Hive function instance. * @@ -124,27 +160,9 @@ public class HiveFunctionWrapper<UDFType> implements Serializable { * * @return the UDF deserialized */ + @SuppressWarnings("unchecked") private UDFType deserializeUDF() { return (UDFType) deserializeObjectFromKryo(udfSerializedBytes, (Class<Serializable>) getUDFClass()); } - - private static byte[] serializeObjectToKryo(Serializable object) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - Output output = new Output(baos); - Kryo kryo = new Kryo(); - kryo.writeObject(output, object); - output.close(); - return baos.toByteArray(); - } - - private static <T extends Serializable> T deserializeObjectFromKryo( - byte[] bytes, Class<T> clazz) { - Input inp = new Input(new ByteArrayInputStream(bytes)); - Kryo kryo = new Kryo(); - kryo.setClassLoader(clazz.getClassLoader()); - T func = kryo.readObject(inp, clazz); - inp.close(); - return func; - } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java index 562e053f1c8..f798a8a4543 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java @@ -140,7 +140,7 @@ public class HiveModule implements Module { if (name.equalsIgnoreCase("internal_interval")) { return Optional.of( factory.createFunctionDefinitionFromHiveFunction( - name, HiveGenericUDFInternalInterval.class.getName())); + name, HiveGenericUDFInternalInterval.class.getName(), context)); } Optional<FunctionInfo> info = hiveShim.getBuiltInFunctionInfo(name); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java index 67b796de25b..c2bf0b1d725 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java @@ -36,10 +36,8 @@ import org.apache.flink.table.factories.PlannerFactoryUtil; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.resource.ResourceManager; import org.apache.flink.util.MutableURLClassLoader; -import org.apache.flink.util.TemporaryClassLoaderContext; import java.lang.reflect.Method; -import java.util.function.Supplier; import static org.apache.flink.table.client.gateway.context.SessionContext.SessionState;
