CosmosNi commented on code in PR #7928:
URL: https://github.com/apache/seatunnel/pull/7928#discussion_r1829310163
##########
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java:
##########
@@ -665,4 +690,153 @@ private Object executeBinaryExpr(BinaryExpression
binaryExpression, Object[] inp
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
String.format("Unsupported SQL Expression: %s ",
binaryExpression));
}
+
+ public List<SeaTunnelRow> lateralView(
+ List<SeaTunnelRow> seaTunnelRows,
+ List<LateralView> lateralViews,
+ SeaTunnelRowType outRowType) {
+ for (LateralView lateralView : lateralViews) {
+ Function function = lateralView.getGeneratorFunction();
+ boolean isUsingOuter = lateralView.isUsingOuter();
+ String functionName = function.getName();
+ if (EXPLODE.equalsIgnoreCase(functionName)) {
+ seaTunnelRows = explode(seaTunnelRows, function, outRowType,
isUsingOuter);
+ } else {
+ throw new SeaTunnelRuntimeException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+ "Transform config error! UnSupport function:" +
functionName);
+ }
+ }
+
+ return seaTunnelRows;
+ }
+
+ private List<SeaTunnelRow> explode(
+ List<SeaTunnelRow> seaTunnelRows,
+ Function lateralViewFunction,
+ SeaTunnelRowType outRowType,
+ boolean isUsingOuter) {
+ ExpressionList<?> expressions = lateralViewFunction.getParameters();
+ for (Expression expression : expressions) {
+ if (expression instanceof Column) {
+ String column = ((Column) expression).getColumnName();
+ List<SeaTunnelRow> next = new ArrayList<>();
+ for (SeaTunnelRow row : seaTunnelRows) {
+ int fieldIndex = outRowType.indexOf(column);
+ Object splitFieldValue = row.getField(fieldIndex);
+ if (splitFieldValue == null) {
+ continue;
+ }
+ if (splitFieldValue instanceof Object[]) {
+ Object[] rowList = (Object[]) splitFieldValue;
+ if (ArrayUtils.isEmpty(rowList) && isUsingOuter) {
+ SeaTunnelRow outputRow = row.copy();
+ outputRow.setField(fieldIndex, null);
+ next.add(outputRow);
+ } else {
+ for (Object fieldValue : rowList) {
+ SeaTunnelRow outputRow = row.copy();
+ outputRow.setField(fieldIndex, fieldValue);
+ next.add(outputRow);
+ }
+ }
+ }
+ }
+ seaTunnelRows = next;
+ }
+ if (expression instanceof Function) {
+ Function function = (Function) expression;
+ String functionName = function.getName();
+ if (SPILT.equalsIgnoreCase(functionName)) {
Review Comment:
This seems different. it will spilt multi row,split function is for multi
field
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]