This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new bfd8a961 fix(custom function) support utf
     new afd8dd5c Merge pull request #212 from ni-ze/supportRsqldb
bfd8a961 is described below

commit bfd8a961aae5c94b64dd96a36e7116f0a5ea5808
Author: 维章 <[email protected]>
AuthorDate: Fri Sep 9 15:55:38 2022 +0800

    fix(custom function) support utf
---
 .../apache/rocketmq/streams/sink/RocketMQSink.java |  1 -
 .../streams/common/component/ComponentCreator.java |  2 +-
 .../streams/script/service/udf/UDFScript.java      | 26 +++++++++++-----------
 3 files changed, 14 insertions(+), 15 deletions(-)

diff --git 
a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
 
b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
index a8998dad..19422c8f 100644
--- 
a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
+++ 
b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
@@ -18,7 +18,6 @@
 package org.apache.rocketmq.streams.sink;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/component/ComponentCreator.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/component/ComponentCreator.java
index 3efac351..c61d630d 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/component/ComponentCreator.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/component/ComponentCreator.java
@@ -55,7 +55,7 @@ public class ComponentCreator {
     /**
      * blink jar包所在的路径
      */
-    public static final String BLINK_UDF_JAR_PATH = 
"dipper.blink.udf.jar.path";
+    public static final String CUSTOM_FUNCTION_JAR_PATH = 
"rsldb.custom.function.jar.path";
     private static final Map<String, IComponent> key2Component = new 
HashMap<>();
     private static Properties properties;
     public static String propertiesPath;//属性文件位置,便于定期刷新
diff --git 
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/udf/UDFScript.java
 
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/udf/UDFScript.java
index c8f0e927..2a9b8992 100644
--- 
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/udf/UDFScript.java
+++ 
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/udf/UDFScript.java
@@ -22,7 +22,10 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.calssscaner.AbstractScan;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.topology.model.AbstractScript;
@@ -39,11 +42,9 @@ import 
org.apache.rocketmq.streams.script.service.IScriptUDFInit;
  * 主要是为了兼容外部的udf,或者把任意java的方法发布成函数
  */
 public class UDFScript extends AbstractScript implements IScriptUDFInit {
-
+    private static final Log LOG = LogFactory.getLog(AbstractScan.class);
     private transient ScriptComponent scriptComponent = 
ScriptComponent.getInstance();
 
-    private static final org.apache.commons.logging.Log LOG = 
LogFactory.getLog(UDFScript.class);
-
     protected transient Object instance;
 
     protected String fullClassName;//类的全限定名
@@ -76,8 +77,7 @@ public class UDFScript extends AbstractScript implements 
IScriptUDFInit {
     @Override
     protected boolean initConfigurable() {
         registFunctionSerivce(scriptComponent.getFunctionService());
-        FunctionConfigure functionConfigure =
-            
scriptComponent.getFunctionService().getFunctionConfigure(createInitMethodName(),
 this.initParameters);
+        FunctionConfigure functionConfigure = 
scriptComponent.getFunctionService().getFunctionConfigure(createInitMethodName(),
 this.initParameters);
         if (functionConfigure == null) {
             return true;
         }
@@ -155,16 +155,15 @@ public class UDFScript extends AbstractScript implements 
IScriptUDFInit {
             clazz = classLoader.loadClass(fullClassName);
             instance = clazz.newInstance();
             return true;
-        } catch (Exception e) {
-            e.printStackTrace();
+        } catch (ClassNotFoundException e) {
+            LOG.warn("can not find [" + fullClassName + "] in classpath, may 
be load in customized path.");
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new RuntimeException(e);
         }
+
         try {
             String jarUrl = getValue();
-//            if (StringUtil.isEmpty(jarUrl)) {
-//                clazz = classLoader.loadClass(fullClassName);
-//                instance = clazz.newInstance();
-//                return true;
-//            }
+
             URL url = null;
             if (isURL) {
                 url = new URL(getValue());
@@ -184,13 +183,14 @@ public class UDFScript extends AbstractScript implements 
IScriptUDFInit {
                 }
             }
 
-            URL[] urls = new URL[] {url};
+            URL[] urls = new URL[]{url};
             URLClassLoader urlClassLoader = new URLClassLoader(urls, 
classLoader);
             classLoader = urlClassLoader;
 
             clazz = classLoader.loadClass(fullClassName);
             instance = clazz.newInstance();
 
+            LOG.info("load [" + fullClassName + "] success.");
         } catch (Exception e) {
             LOG.error("加载异常," + e.getMessage(), e);
             return false;

Reply via email to