This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new bfd8a961 fix(custom function) support utf
new afd8dd5c Merge pull request #212 from ni-ze/supportRsqldb
bfd8a961 is described below
commit bfd8a961aae5c94b64dd96a36e7116f0a5ea5808
Author: 维章 <[email protected]>
AuthorDate: Fri Sep 9 15:55:38 2022 +0800
fix(custom function) support utf
---
.../apache/rocketmq/streams/sink/RocketMQSink.java | 1 -
.../streams/common/component/ComponentCreator.java | 2 +-
.../streams/script/service/udf/UDFScript.java | 26 +++++++++++-----------
3 files changed, 14 insertions(+), 15 deletions(-)
diff --git
a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
index a8998dad..19422c8f 100644
---
a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
+++
b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
@@ -18,7 +18,6 @@
package org.apache.rocketmq.streams.sink;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/component/ComponentCreator.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/component/ComponentCreator.java
index 3efac351..c61d630d 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/component/ComponentCreator.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/component/ComponentCreator.java
@@ -55,7 +55,7 @@ public class ComponentCreator {
/**
* blink jar包所在的路径
*/
- public static final String BLINK_UDF_JAR_PATH =
"dipper.blink.udf.jar.path";
+ public static final String CUSTOM_FUNCTION_JAR_PATH =
"rsldb.custom.function.jar.path";
private static final Map<String, IComponent> key2Component = new
HashMap<>();
private static Properties properties;
public static String propertiesPath;//属性文件位置,便于定期刷新
diff --git
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/udf/UDFScript.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/udf/UDFScript.java
index c8f0e927..2a9b8992 100644
---
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/udf/UDFScript.java
+++
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/udf/UDFScript.java
@@ -22,7 +22,10 @@ import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
import java.util.Map;
+
+import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.calssscaner.AbstractScan;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.topology.model.AbstractScript;
@@ -39,11 +42,9 @@ import
org.apache.rocketmq.streams.script.service.IScriptUDFInit;
* 主要是为了兼容外部的udf,或者把任意java的方法发布成函数
*/
public class UDFScript extends AbstractScript implements IScriptUDFInit {
-
+ private static final Log LOG = LogFactory.getLog(AbstractScan.class);
private transient ScriptComponent scriptComponent =
ScriptComponent.getInstance();
- private static final org.apache.commons.logging.Log LOG =
LogFactory.getLog(UDFScript.class);
-
protected transient Object instance;
protected String fullClassName;//类的全限定名
@@ -76,8 +77,7 @@ public class UDFScript extends AbstractScript implements
IScriptUDFInit {
@Override
protected boolean initConfigurable() {
registFunctionSerivce(scriptComponent.getFunctionService());
- FunctionConfigure functionConfigure =
-
scriptComponent.getFunctionService().getFunctionConfigure(createInitMethodName(),
this.initParameters);
+ FunctionConfigure functionConfigure =
scriptComponent.getFunctionService().getFunctionConfigure(createInitMethodName(),
this.initParameters);
if (functionConfigure == null) {
return true;
}
@@ -155,16 +155,15 @@ public class UDFScript extends AbstractScript implements
IScriptUDFInit {
clazz = classLoader.loadClass(fullClassName);
instance = clazz.newInstance();
return true;
- } catch (Exception e) {
- e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ LOG.warn("can not find [" + fullClassName + "] in classpath, may
be load in customized path.");
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException(e);
}
+
try {
String jarUrl = getValue();
-// if (StringUtil.isEmpty(jarUrl)) {
-// clazz = classLoader.loadClass(fullClassName);
-// instance = clazz.newInstance();
-// return true;
-// }
+
URL url = null;
if (isURL) {
url = new URL(getValue());
@@ -184,13 +183,14 @@ public class UDFScript extends AbstractScript implements
IScriptUDFInit {
}
}
- URL[] urls = new URL[] {url};
+ URL[] urls = new URL[]{url};
URLClassLoader urlClassLoader = new URLClassLoader(urls,
classLoader);
classLoader = urlClassLoader;
clazz = classLoader.loadClass(fullClassName);
instance = clazz.newInstance();
+ LOG.info("load [" + fullClassName + "] success.");
} catch (Exception e) {
LOG.error("加载异常," + e.getMessage(), e);
return false;