yyy1000 commented on code in PR #9395:
URL: https://github.com/apache/arrow-datafusion/pull/9395#discussion_r1509460509
##########
datafusion/proto/Cargo.toml:
##########
@@ -55,5 +55,6 @@ serde_json = { workspace = true, optional = true }
[dev-dependencies]
doc-comment = { workspace = true }
+regex = "1.10.3"
Review Comment:
I think add this dependency for test is OK?
##########
datafusion/proto/tests/cases/roundtrip_logical_plan.rs:
##########
@@ -756,6 +767,113 @@ impl LogicalExtensionCodec for TopKExtensionCodec {
}
}
+#[derive(Debug)]
+struct MyRegexUdf {
+ signature: Signature,
+ // compiled regex
+ _regex: Regex,
+ // regex as original string
+ pattern: String,
+}
+
+impl MyRegexUdf {
+ fn new(_regex: Regex, pattern: String) -> Self {
+ Self {
+ signature: Signature::uniform(
+ 1,
+ vec![DataType::Int32],
+ Volatility::Immutable,
+ ),
+ _regex,
+ pattern,
+ }
+ }
+}
+
+/// Implement the ScalarUDFImpl trait for MyRegexUdf
+impl ScalarUDFImpl for MyRegexUdf {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+ fn name(&self) -> &str {
+ "regex_udf"
+ }
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+ fn return_type(&self, args: &[DataType]) -> Result<DataType> {
+ if !matches!(args.first(), Some(&DataType::Utf8)) {
+ return plan_err!("regex_udf only accepts Utf8 arguments");
+ }
+ Ok(DataType::Int32)
+ }
+ fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ unimplemented!()
+ }
+}
+
+#[derive(Debug)]
+pub struct ScalarUDFExtensionCodec {}
+
+impl LogicalExtensionCodec for ScalarUDFExtensionCodec {
+ fn try_decode(
+ &self,
+ _buf: &[u8],
+ _inputs: &[LogicalPlan],
+ _ctx: &SessionContext,
+ ) -> Result<Extension> {
+ not_impl_err!("No extension codec provided")
+ }
+
+ fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
+ not_impl_err!("No extension codec provided")
+ }
+
+ fn try_decode_table_provider(
+ &self,
+ _buf: &[u8],
+ _schema: SchemaRef,
+ _ctx: &SessionContext,
+ ) -> Result<Arc<dyn TableProvider>> {
+ internal_err!("unsupported plan type")
+ }
+
+ fn try_encode_table_provider(
+ &self,
+ _node: Arc<dyn TableProvider>,
+ _buf: &mut Vec<u8>,
+ ) -> Result<()> {
+ internal_err!("unsupported plan type")
+ }
+
+ fn try_decode_udf(&self, _name: &str, buf: &[u8]) ->
Result<Arc<ScalarUDF>> {
+ if let Ok(proto) = proto::MyRegexUdfNode::decode(buf) {
+ let regex = Regex::new(&proto.pattern).map_err(|_err| ..);
+ match regex {
+ Ok(regex) =>
Ok(Arc::new(ScalarUDF::new_from_impl(MyRegexUdf::new(
+ regex,
+ proto.pattern,
+ )))),
+ Err(e) => internal_err!("unsupported regex pattern {e:?}"),
+ }
+ } else {
+ not_impl_err!("unrecognized scalar UDF implementation, cannot
decode")
+ }
+ }
+
+ fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) ->
Result<()> {
+ let binding = node.inner();
+ let udf = binding.as_any().downcast_ref::<MyRegexUdf>().unwrap();
+ let proto = proto::MyRegexUdfNode {
+ pattern: udf.pattern.clone(),
+ };
+ proto.encode(buf).map_err(|e| {
+ DataFusionError::Internal(format!("failed to encode udf: {e:?}"))
+ })?;
+ Ok(())
+ }
Review Comment:
This is the encode and decode logic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]