This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new deb07e99560 [FLINK-33119][table] The pojo result returned be procedure
should be Row of fields in the pojo instead of whole pojo object (#23450)
deb07e99560 is described below
commit deb07e99560b45033a629afc3f90666ad0a32feb
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Sep 22 16:28:17 2023 +0800
[FLINK-33119][table] The pojo result returned be procedure should be Row of
fields in the pojo instead of whole pojo object (#23450)
---
.../operations/PlannerCallProcedureOperation.java | 53 ++++++++++++++++++----
.../runtime/stream/sql/ProcedureITCase.java | 3 +-
2 files changed, 46 insertions(+), 10 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
index 134d2fd2737..a96fbf65f56 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.data.conversion.RowRowConverter;
import org.apache.flink.table.operations.CallProcedureOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;
@@ -54,6 +55,8 @@ import org.apache.flink.util.CloseableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.lang.reflect.Array;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -67,6 +70,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import static
org.apache.flink.table.types.extraction.ExtractionUtils.isAssignable;
+import static
org.apache.flink.table.types.logical.LogicalTypeRoot.STRUCTURED_TYPE;
/** Wrapper for valid call procedure operation generated by Planner. */
public class PlannerCallProcedureOperation implements CallProcedureOperation {
@@ -219,10 +223,18 @@ public class PlannerCallProcedureOperation implements
CallProcedureOperation {
ProcedureDefinition.PROCEDURE_CALL,
e.getMessage()),
e);
} catch (InvocationTargetException e) {
+ Throwable exceptionInMethod = e.getTargetException();
throw new TableException(
String.format(
- "Can't involve the method %s.",
ProcedureDefinition.PROCEDURE_CALL),
+ "The %s method caused an error: %s.",
+ ProcedureDefinition.PROCEDURE_CALL,
exceptionInMethod.getMessage()),
e);
+ } catch (Throwable t) {
+ throw new TableException(
+ String.format(
+ "An error occurred while invoking the procedure's
%s method: %s.",
+ ProcedureDefinition.PROCEDURE_CALL,
t.getMessage()),
+ t);
}
}
@@ -237,6 +249,16 @@ public class PlannerCallProcedureOperation implements
CallProcedureOperation {
tableResultType = DataTypes.ROW(DataTypes.FIELD("result",
tableResultType));
}
+ RowRowConverter rowConverter = null;
+ // if the output is struct type,
+ // we need a row converter to help convert it to Row.
+ // we will first convert the struct value to RowData, and then use the
row converter
+ // to convert the RowData to Row.
+ if (outputType.getLogicalType().getTypeRoot() == STRUCTURED_TYPE) {
+ rowConverter = RowRowConverter.create(tableResultType);
+ rowConverter.open(userClassLoader);
+ }
+
// expand the result type to schema
ResolvedSchema resultSchema =
DataTypeUtils.expandCompositeTypeToSchema(tableResultType);
RowDataToStringConverter rowDataToStringConverter =
@@ -255,7 +277,7 @@ public class PlannerCallProcedureOperation implements
CallProcedureOperation {
return TableResultImpl.builder()
.resultProvider(
new CallProcedureResultProvider(
- converter, rowDataToStringConverter,
procedureResult))
+ converter, rowDataToStringConverter,
rowConverter, procedureResult))
.schema(resultSchema)
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.build();
@@ -277,15 +299,20 @@ public class PlannerCallProcedureOperation implements
CallProcedureOperation {
private final DataStructureConverter<Object, Object> converter;
private final RowDataToStringConverter toStringConverter;
+
+ // a converter to convert internal RowData to Row
+ private final @Nullable RowRowConverter rowConverter;
private final Object[] result;
public CallProcedureResultProvider(
DataStructureConverter<Object, Object> converter,
RowDataToStringConverter toStringConverter,
+ @Nullable RowRowConverter rowConverter,
Object result) {
this.converter = converter;
this.toStringConverter = toStringConverter;
this.result = toResultArray(result);
+ this.rowConverter = rowConverter;
}
@Override
@@ -305,11 +332,7 @@ public class PlannerCallProcedureOperation implements
CallProcedureOperation {
@Override
public RowData next() {
- Object element =
converter.toInternalOrNull(objectIterator.next());
- if (!(element instanceof RowData)) {
- return GenericRowData.of(element);
- }
- return (RowData) element;
+ return toRowData(objectIterator.next());
}
@Override
@@ -317,6 +340,14 @@ public class PlannerCallProcedureOperation implements
CallProcedureOperation {
};
}
+ private RowData toRowData(Object externalValue) {
+ Object element = converter.toInternalOrNull(externalValue);
+ if (!(element instanceof RowData)) {
+ return GenericRowData.of(element);
+ }
+ return (RowData) element;
+ }
+
@Override
public CloseableIterator<Row> toExternalIterator() {
Iterator<Object> objectIterator = Arrays.stream(result).iterator();
@@ -331,7 +362,13 @@ public class PlannerCallProcedureOperation implements
CallProcedureOperation {
public Row next() {
Object element = objectIterator.next();
if (!(element instanceof Row)) {
- return Row.of(element);
+ if (rowConverter != null) {
+ // first convert the extern value to internal
RowData,
+ // then convert the RowData to Row
+ return rowConverter.toExternal(toRowData(element));
+ } else {
+ return Row.of(element);
+ }
}
return (Row) element;
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
index efb6121065b..da3262b6f12 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
@@ -166,8 +166,7 @@ public class ProcedureITCase extends StreamingTestBase {
tableResult = tEnv().executeSql("call `system`.generate_user('yuxia',
18)");
verifyTableResult(
tableResult,
- Collections.singletonList(
- Row.of(new
TestProcedureCatalogFactory.UserPojo("yuxia", 18))),
+ Collections.singletonList(Row.of("yuxia", 18)),
ResolvedSchema.of(
Column.physical("name", DataTypes.STRING()),
Column.physical("age",
DataTypes.INT().notNull().bridgedTo(int.class))));