nodece commented on code in PR #23811:
URL: https://github.com/apache/pulsar/pull/23811#discussion_r1923840366


##########
pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java:
##########


Review Comment:
   Please don't format this file to avoid conflict when this PR is 
cherry-picked to the other version.



##########
pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java:
##########
@@ -202,7 +234,8 @@ public void testFunctionResultNull() throws Exception {
 
     @NotNull
     private JavaInstanceRunnable getJavaInstanceRunnable(boolean autoAck,
-                                                         
org.apache.pulsar.functions.proto.Function.ProcessingGuarantees 
processingGuarantees) throws Exception {
+                                                         
org.apache.pulsar.functions.proto.Function.ProcessingGuarantees 
processingGuarantees)

Review Comment:
   Please revert these changes.



##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########


Review Comment:
   DO NOT format the unmodified code.



##########
pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java:
##########


Review Comment:
   DO NOT format the unmodified code.



##########
pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java:
##########
@@ -1204,4 +1307,12 @@ public ByteBuffer process(ByteBuffer input, Context 
context) throws Exception {
             return input;
         }
     }
+
+    public static class SpendTimeFunction implements 
org.apache.pulsar.functions.api.Function<String, String> {

Review Comment:
   
`org.apache.pulsar.functions.api.examples.JavaNativeAsyncExclamationFunction` 
can be instead of this function, please remove this class.
   
   BTW, this is a sync function, not async. 



##########
pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java:
##########
@@ -173,6 +184,27 @@ public Void process(String input, Context context) throws 
Exception {
         }
     }
 
+    @Test
+    public void testFunctionAsyncTime() throws Exception {
+        FunctionDetails functionDetails = FunctionDetails.newBuilder()
+                .setAutoAck(true)
+                
.setProcessingGuarantees(org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.MANUAL)
+                .build();
+        JavaInstanceRunnable javaInstanceRunnable = 
createRunnable(functionDetails);
+        Field stats = JavaInstanceRunnable.class.getDeclaredField("stats");

Review Comment:
   DO NOT use the Java reflection to set the stats.
   
   Please  add a set method to the `JavaInstanceRunnable`:
   ```
       @VisibleForTesting
       void setStats(ComponentStatsManager stats) {
           this.stats = stats;
       }
   ```
   And then call this method to set the `manager` as the `stats` value.



##########
pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java:
##########
@@ -296,6 +305,92 @@ public void testReadCompactedFunction() throws Exception {
         producer.close();
     }
 
+    @Test(timeOut = 20000)
+    public void testPulsarFunctionAsyncStatTime() throws Exception {
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sourceTopic = "persistent://" + replNamespace + 
"/my-topic1";
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String propertyKey = "key";
+        final String propertyValue = "value";
+        final String functionName = SpendTimeFunction.class.getName();

Review Comment:
   ```suggestion
           final String functionName = 
"org.apache.pulsar.functions.api.examples.JavaNativeAsyncExclamationFunction";
   ```



##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java:
##########


Review Comment:
   DO NOT format the unmodified code.



##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java:
##########


Review Comment:
   DO NOT format the unmodified code.



##########
pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java:
##########
@@ -296,6 +305,92 @@ public void testReadCompactedFunction() throws Exception {
         producer.close();
     }
 
+    @Test(timeOut = 20000)
+    public void testPulsarFunctionAsyncStatTime() throws Exception {
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sourceTopic = "persistent://" + replNamespace + 
"/my-topic1";
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String propertyKey = "key";
+        final String propertyValue = "value";

Review Comment:
   ```suggestion
   ```



##########
pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java:
##########
@@ -173,6 +184,27 @@ public Void process(String input, Context context) throws 
Exception {
         }
     }
 
+    @Test
+    public void testFunctionAsyncTime() throws Exception {
+        FunctionDetails functionDetails = FunctionDetails.newBuilder()
+                .setAutoAck(true)
+                
.setProcessingGuarantees(org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.MANUAL)
+                .build();
+        JavaInstanceRunnable javaInstanceRunnable = 
createRunnable(functionDetails);
+        Field stats = JavaInstanceRunnable.class.getDeclaredField("stats");
+        FunctionStatsManager manager = mock(FunctionStatsManager.class);
+        stats.setAccessible(true);
+        stats.set(javaInstanceRunnable, manager);
+        stats.setAccessible(false);
+        JavaExecutionResult javaExecutionResult = new JavaExecutionResult();
+        Thread.sleep(500);
+        Record record = mock(Record.class);
+        javaInstanceRunnable.handleResult(record, javaExecutionResult);
+        ArgumentCaptor<Long> timeCaptor = ArgumentCaptor.forClass(Long.class);
+        verify(manager).processTimeEnd(timeCaptor.capture());
+        Assert.assertEquals(timeCaptor.getValue(), 
javaExecutionResult.getStartTime() + 500, 100);

Review Comment:
   ```suggestion
           Assert.assertEquals(timeCaptor.getValue(), 
javaExecutionResult.getStartTime());
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java:
##########
@@ -93,6 +95,8 @@ protected static FunctionConfig createFunctionConfig(String 
tenant, String names
         functionConfig.setAutoAck(true);
         if (!isBuiltin) {
             
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
+        } else {
+            functionConfig.setClassName(functionName);

Review Comment:
   Why add this line?



##########
pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java:
##########
@@ -255,10 +288,10 @@ public void 
testSourceConfigParsingPreservesOriginalType() throws Exception {
     public Object[][] component() {
         return new Object[][]{
                 // Schema: component type, whether to map in secrets
-                { FunctionDetails.ComponentType.SINK },
-                { FunctionDetails.ComponentType.SOURCE },
-                { FunctionDetails.ComponentType.FUNCTION },
-                { FunctionDetails.ComponentType.UNKNOWN },
+                {FunctionDetails.ComponentType.SINK},

Review Comment:
   Please revert these changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to