This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new deb07e99560 [FLINK-33119][table] The pojo result returned be procedure 
should be Row of fields in the pojo instead of whole pojo object (#23450)
deb07e99560 is described below

commit deb07e99560b45033a629afc3f90666ad0a32feb
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Sep 22 16:28:17 2023 +0800

    [FLINK-33119][table] The pojo result returned be procedure should be Row of 
fields in the pojo instead of whole pojo object (#23450)
---
 .../operations/PlannerCallProcedureOperation.java  | 53 ++++++++++++++++++----
 .../runtime/stream/sql/ProcedureITCase.java        |  3 +-
 2 files changed, 46 insertions(+), 10 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
index 134d2fd2737..a96fbf65f56 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.conversion.DataStructureConverter;
 import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.data.conversion.RowRowConverter;
 import org.apache.flink.table.operations.CallProcedureOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.OperationUtils;
@@ -54,6 +55,8 @@ import org.apache.flink.util.CloseableIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.lang.reflect.Array;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -67,6 +70,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.types.extraction.ExtractionUtils.isAssignable;
+import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.STRUCTURED_TYPE;
 
 /** Wrapper for valid call procedure operation generated by Planner. */
 public class PlannerCallProcedureOperation implements CallProcedureOperation {
@@ -219,10 +223,18 @@ public class PlannerCallProcedureOperation implements 
CallProcedureOperation {
                             ProcedureDefinition.PROCEDURE_CALL, 
e.getMessage()),
                     e);
         } catch (InvocationTargetException e) {
+            Throwable exceptionInMethod = e.getTargetException();
             throw new TableException(
                     String.format(
-                            "Can't involve the method %s.", 
ProcedureDefinition.PROCEDURE_CALL),
+                            "The %s method caused an error: %s.",
+                            ProcedureDefinition.PROCEDURE_CALL, 
exceptionInMethod.getMessage()),
                     e);
+        } catch (Throwable t) {
+            throw new TableException(
+                    String.format(
+                            "An error occurred while invoking the procedure's 
%s method: %s.",
+                            ProcedureDefinition.PROCEDURE_CALL, 
t.getMessage()),
+                    t);
         }
     }
 
@@ -237,6 +249,16 @@ public class PlannerCallProcedureOperation implements 
CallProcedureOperation {
             tableResultType = DataTypes.ROW(DataTypes.FIELD("result", 
tableResultType));
         }
 
+        RowRowConverter rowConverter = null;
+        // if the output is struct type,
+        // we need a row converter to help convert it to Row.
+        // we will first convert the struct value to RowData, and then use the 
row converter
+        // to convert the RowData to Row.
+        if (outputType.getLogicalType().getTypeRoot() == STRUCTURED_TYPE) {
+            rowConverter = RowRowConverter.create(tableResultType);
+            rowConverter.open(userClassLoader);
+        }
+
         // expand the result type to schema
         ResolvedSchema resultSchema = 
DataTypeUtils.expandCompositeTypeToSchema(tableResultType);
         RowDataToStringConverter rowDataToStringConverter =
@@ -255,7 +277,7 @@ public class PlannerCallProcedureOperation implements 
CallProcedureOperation {
         return TableResultImpl.builder()
                 .resultProvider(
                         new CallProcedureResultProvider(
-                                converter, rowDataToStringConverter, 
procedureResult))
+                                converter, rowDataToStringConverter, 
rowConverter, procedureResult))
                 .schema(resultSchema)
                 .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
                 .build();
@@ -277,15 +299,20 @@ public class PlannerCallProcedureOperation implements 
CallProcedureOperation {
 
         private final DataStructureConverter<Object, Object> converter;
         private final RowDataToStringConverter toStringConverter;
+
+        // a converter to convert internal RowData to Row
+        private final @Nullable RowRowConverter rowConverter;
         private final Object[] result;
 
         public CallProcedureResultProvider(
                 DataStructureConverter<Object, Object> converter,
                 RowDataToStringConverter toStringConverter,
+                @Nullable RowRowConverter rowConverter,
                 Object result) {
             this.converter = converter;
             this.toStringConverter = toStringConverter;
             this.result = toResultArray(result);
+            this.rowConverter = rowConverter;
         }
 
         @Override
@@ -305,11 +332,7 @@ public class PlannerCallProcedureOperation implements 
CallProcedureOperation {
 
                 @Override
                 public RowData next() {
-                    Object element = 
converter.toInternalOrNull(objectIterator.next());
-                    if (!(element instanceof RowData)) {
-                        return GenericRowData.of(element);
-                    }
-                    return (RowData) element;
+                    return toRowData(objectIterator.next());
                 }
 
                 @Override
@@ -317,6 +340,14 @@ public class PlannerCallProcedureOperation implements 
CallProcedureOperation {
             };
         }
 
+        private RowData toRowData(Object externalValue) {
+            Object element = converter.toInternalOrNull(externalValue);
+            if (!(element instanceof RowData)) {
+                return GenericRowData.of(element);
+            }
+            return (RowData) element;
+        }
+
         @Override
         public CloseableIterator<Row> toExternalIterator() {
             Iterator<Object> objectIterator = Arrays.stream(result).iterator();
@@ -331,7 +362,13 @@ public class PlannerCallProcedureOperation implements 
CallProcedureOperation {
                 public Row next() {
                     Object element = objectIterator.next();
                     if (!(element instanceof Row)) {
-                        return Row.of(element);
+                        if (rowConverter != null) {
+                            // first convert the extern value to internal 
RowData,
+                            // then convert the RowData to Row
+                            return rowConverter.toExternal(toRowData(element));
+                        } else {
+                            return Row.of(element);
+                        }
                     }
                     return (Row) element;
                 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
index efb6121065b..da3262b6f12 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
@@ -166,8 +166,7 @@ public class ProcedureITCase extends StreamingTestBase {
         tableResult = tEnv().executeSql("call `system`.generate_user('yuxia', 
18)");
         verifyTableResult(
                 tableResult,
-                Collections.singletonList(
-                        Row.of(new 
TestProcedureCatalogFactory.UserPojo("yuxia", 18))),
+                Collections.singletonList(Row.of("yuxia", 18)),
                 ResolvedSchema.of(
                         Column.physical("name", DataTypes.STRING()),
                         Column.physical("age", 
DataTypes.INT().notNull().bridgedTo(int.class))));

Reply via email to