This is an automated email from the ASF dual-hosted git repository.

srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 5071629  SAMZA-2230: Support for ANY type arguments in udfs that 
expect strongly typed arguments (#1060)
5071629 is described below

commit 5071629cdb432143363223259b72c2dde1278271
Author: Srinivasulu Punuru <[email protected]>
AuthorDate: Thu Jul 11 10:26:33 2019 -0700

    SAMZA-2230: Support for ANY type arguments in udfs that expect strongly 
typed arguments (#1060)
    
    * Support for ANY type arguments in udfs that expect strongly typed 
arguments
    
    * Rename the testcase
    
    * Adding license
    
    * Fixed the type name in log
    
    * Fix for the testcase
---
 .../sql/planner/SamzaSqlScalarFunctionImpl.java     | 15 ++++++++++++++-
 .../sql/runner/TestSamzaSqlApplicationConfig.java   |  3 +--
 .../sql/util/{MyTestUdf.java => MyTestObjUdf.java}  | 21 ++++++---------------
 .../org/apache/samza/sql/util/MyTestPolyUdf.java    |  3 +--
 .../java/org/apache/samza/sql/util/MyTestUdf.java   |  6 ------
 .../apache/samza/sql/util/SamzaSqlTestConfig.java   |  3 ++-
 .../samza/test/samzasql/TestSamzaSqlEndToEnd.java   | 20 ++++++++++++++++++++
 7 files changed, 44 insertions(+), 27 deletions(-)

diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java
index 0793bce..b2b6119 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlScalarFunctionImpl.java
@@ -20,6 +20,7 @@
 package org.apache.samza.sql.planner;
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -81,8 +82,20 @@ public class SamzaSqlScalarFunctionImpl implements 
ScalarFunction, Implementable
       final Expression samzaContext = 
Expressions.parameter(SamzaSqlExecutionContext.class, "context");
       final Expression getUdfInstance = Expressions.call(ScalarUdf.class, 
sqlContext, getUdfMethod,
           Expressions.constant(udfMethod.getDeclaringClass().getName()), 
Expressions.constant(udfName), samzaContext);
+
+      List<Expression> convertedOperands = new ArrayList<>();
+      // SAMZA: 2230 To allow UDFS to accept Untyped arguments.
+      // We explicitly Convert the untyped arguments to type that the UDf 
expects.
+      for(int index = 0; index < translatedOperands.size(); index++) {
+        if (translatedOperands.get(index).type == Object.class && 
udfMethod.getParameters()[index].getType() != Object.class) {
+          
convertedOperands.add(Expressions.convert_(translatedOperands.get(index), 
udfMethod.getParameters()[index].getType()));
+        } else {
+          convertedOperands.add(translatedOperands.get(index));
+        }
+      }
+
       final Expression callExpression = 
Expressions.call(Expressions.convert_(getUdfInstance, 
udfMethod.getDeclaringClass()), udfMethod,
-          translatedOperands);
+          convertedOperands);
       return callExpression;
     }, NullPolicy.NONE, false);
   }
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
index c6fb357..7db38f7 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
@@ -55,8 +55,7 @@ public class TestSamzaSqlApplicationConfig {
             .collect(Collectors.toList()),
         
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
-    // Two of the UDFs has an overload, hence + 1.
-    Assert.assertEquals(numUdfs + 2, 
samzaSqlApplicationConfig.getUdfMetadata().size());
+    Assert.assertEquals(numUdfs + 1, 
samzaSqlApplicationConfig.getUdfMetadata().size());
     Assert.assertEquals(1, 
samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().size());
     Assert.assertEquals(1, 
samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().size());
   }
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java 
b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestObjUdf.java
similarity index 77%
copy from samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
copy to samza-sql/src/test/java/org/apache/samza/sql/util/MyTestObjUdf.java
index d0ac517..14a68f4 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestObjUdf.java
@@ -20,7 +20,6 @@
 package org.apache.samza.sql.util;
 
 import org.apache.samza.config.Config;
-import org.apache.samza.context.Context;
 import org.apache.samza.sql.schema.SamzaSqlFieldType;
 import org.apache.samza.sql.udfs.SamzaSqlUdf;
 import org.apache.samza.sql.udfs.SamzaSqlUdfMethod;
@@ -32,26 +31,18 @@ import org.slf4j.LoggerFactory;
 /**
  * Test UDF used by unit and integration tests.
  */
-@SamzaSqlUdf(name = "MyTest", description = "Test UDF.")
-public class MyTestUdf implements ScalarUdf {
+@SamzaSqlUdf(name = "MyTestObj", description = "Test UDF.")
+public class MyTestObjUdf implements ScalarUdf {
 
-  private static final Logger LOG = LoggerFactory.getLogger(MyTestUdf.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(MyTestObjUdf.class);
 
   @SamzaSqlUdfMethod(params = SamzaSqlFieldType.INT32)
-  public Integer execute(Integer value) {
-    return value * 2;
+  public Object execute(Integer value) {
+    return value;
   }
 
-  @SamzaSqlUdfMethod(params = SamzaSqlFieldType.ANY)
-  public Integer execute(Object value) {
-    return ((Integer) value) * 2;
-  }
-
-
   @Override
-  public void init(Config udfConfig, Context context) {
+  public void init(Config udfConfig) {
     LOG.info("Init called with {}", udfConfig);
   }
 }
-
-
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestPolyUdf.java 
b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestPolyUdf.java
index 29769c0..11f6b1b 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestPolyUdf.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestPolyUdf.java
@@ -40,12 +40,11 @@ public class MyTestPolyUdf implements ScalarUdf {
     return value * 2;
   }
 
-  @SamzaSqlUdfMethod(params = SamzaSqlFieldType.ANY)
+  @SamzaSqlUdfMethod(params = SamzaSqlFieldType.STRING)
   public Integer execute(String value) {
     return value.length() * 2;
   }
 
-
   @Override
   public void init(Config udfConfig, Context context) {
     LOG.info("Init called with {}", udfConfig);
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java 
b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
index d0ac517..983df51 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/MyTestUdf.java
@@ -42,12 +42,6 @@ public class MyTestUdf implements ScalarUdf {
     return value * 2;
   }
 
-  @SamzaSqlUdfMethod(params = SamzaSqlFieldType.ANY)
-  public Integer execute(Object value) {
-    return ((Integer) value) * 2;
-  }
-
-
   @Override
   public void init(Config udfConfig, Context context) {
     LOG.info("Init called with {}", udfConfig);
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java 
b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
index 364d0a9..a5b03a0 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
@@ -96,7 +96,8 @@ public class SamzaSqlTestConfig {
         ConfigBasedUdfResolver.class.getName());
     staticConfigs.put(configUdfResolverDomain + 
ConfigBasedUdfResolver.CFG_UDF_CLASSES, Joiner.on(",")
         .join(MyTestUdf.class.getName(), RegexMatchUdf.class.getName(), 
FlattenUdf.class.getName(),
-            MyTestArrayUdf.class.getName(), 
BuildOutputRecordUdf.class.getName(), MyTestPolyUdf.class.getName()));
+            MyTestArrayUdf.class.getName(), 
BuildOutputRecordUdf.class.getName(), MyTestPolyUdf.class.getName(),
+            MyTestObjUdf.class.getName()));
 
     String avroSystemConfigPrefix =
         String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, 
SAMZA_SYSTEM_TEST_AVRO);
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 6b95b27..aef7926 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -479,6 +479,26 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
+  public void testUdfUnTypedArgumentToTypedUdf() {
+    int numMessages = 20;
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    String sql1 = "Insert into testavro.outputTopic(id, long_value) "
+        + "select id, MyTest(MyTestObj(id)) as long_value from 
testavro.SIMPLE1";
+    List<String> sqlStmts = Collections.singletonList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    runApplication(new MapConfig(staticConfigs));
+
+    LOG.info("output Messages " + TestAvroSystemFactory.messages);
+
+    List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("long_value").toString()))
+        .sorted()
+        .collect(Collectors.toList());
+    Assert.assertEquals(outMessages.size(), numMessages);
+  }
+
+  @Test
   public void testEndToEndUdf() throws Exception {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();

Reply via email to