Hisoka-X commented on code in PR #7928:
URL: https://github.com/apache/seatunnel/pull/7928#discussion_r1834019325
##########
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java:
##########
@@ -665,4 +699,180 @@ private Object executeBinaryExpr(BinaryExpression
binaryExpression, Object[] inp
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
String.format("Unsupported SQL Expression: %s ",
binaryExpression));
}
+
+ public List<SeaTunnelRow> lateralView(
+ List<SeaTunnelRow> seaTunnelRows,
+ List<LateralView> lateralViews,
+ SeaTunnelRowType outRowType) {
+ for (LateralView lateralView : lateralViews) {
+ Function function = lateralView.getGeneratorFunction();
+ boolean isUsingOuter = lateralView.isUsingOuter();
+ String functionName = function.getName();
+ String alias = lateralView.getColumnAlias().getName();
+ if (EXPLODE.equalsIgnoreCase(functionName)) {
+ seaTunnelRows = explode(seaTunnelRows, function, outRowType,
isUsingOuter, alias);
+ } else {
+ throw new SeaTunnelRuntimeException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+ "Transform config error! UnSupport function:" +
functionName);
+ }
+ }
+
+ return seaTunnelRows;
+ }
+
+ private List<SeaTunnelRow> explode(
+ List<SeaTunnelRow> seaTunnelRows,
+ Function lateralViewFunction,
+ SeaTunnelRowType outRowType,
+ boolean isUsingOuter,
+ String alias) {
+ ExpressionList<?> expressions = lateralViewFunction.getParameters();
+ int aliasFieldIndex = outRowType.indexOf(alias);
+ for (Expression expression : expressions) {
+ if (expression instanceof Column) {
+ String column = ((Column) expression).getColumnName();
+ List<SeaTunnelRow> next = new ArrayList<>();
+ for (SeaTunnelRow row : seaTunnelRows) {
+ int fieldIndex = outRowType.indexOf(column);
+ Object splitFieldValue = row.getField(fieldIndex);
+ transformExplodeValue(
+ splitFieldValue,
+ outRowType,
+ isUsingOuter,
+ next,
+ aliasFieldIndex,
+ row,
+ expression,
+ true);
+ }
+ seaTunnelRows = next;
+ } else if (expression instanceof Function) {
+ List<SeaTunnelRow> next = new ArrayList<>();
+ for (SeaTunnelRow row : seaTunnelRows) {
+ Object splitFieldValue = computeForValue(expression,
row.getFields());
+ transformExplodeValue(
+ splitFieldValue,
+ outRowType,
+ isUsingOuter,
+ next,
+ aliasFieldIndex,
+ row,
+ expression,
+ false);
+ }
+ seaTunnelRows = next;
+ }
+ }
+ return seaTunnelRows;
+ }
+
+ private void transformExplodeValue(
+ Object splitFieldValue,
+ SeaTunnelRowType outRowType,
+ boolean isUsingOuter,
+ List<SeaTunnelRow> next,
+ int aliasFieldIndex,
+ SeaTunnelRow row,
+ Expression expression,
+ boolean keepValueType) {
+ if (splitFieldValue == null) {
+ if (isUsingOuter) {
+ next.add(copySeaTunnelRow(outRowType.getTotalFields(), row,
aliasFieldIndex, null));
+ }
+ return;
+ }
+ if (splitFieldValue.getClass().isArray()) {
+ if (ArrayUtils.isEmpty((Object[]) splitFieldValue)) {
+ if (isUsingOuter) {
+ next.add(
+ copySeaTunnelRow(
+ outRowType.getTotalFields(), row,
aliasFieldIndex, null));
+ }
+ return;
+ }
+ for (Object fieldValue : (Object[]) splitFieldValue) {
+ Object value = keepValueType ? fieldValue :
String.valueOf(fieldValue);
Review Comment:
if the fieldValue is null, it will be convert to `"null"` by
`String.valueOf(fieldValue)`. The value will be wrong.
##########
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java:
##########
@@ -665,4 +699,180 @@ private Object executeBinaryExpr(BinaryExpression
binaryExpression, Object[] inp
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
String.format("Unsupported SQL Expression: %s ",
binaryExpression));
}
+
+ public List<SeaTunnelRow> lateralView(
+ List<SeaTunnelRow> seaTunnelRows,
+ List<LateralView> lateralViews,
+ SeaTunnelRowType outRowType) {
+ for (LateralView lateralView : lateralViews) {
+ Function function = lateralView.getGeneratorFunction();
+ boolean isUsingOuter = lateralView.isUsingOuter();
+ String functionName = function.getName();
+ String alias = lateralView.getColumnAlias().getName();
+ if (EXPLODE.equalsIgnoreCase(functionName)) {
+ seaTunnelRows = explode(seaTunnelRows, function, outRowType,
isUsingOuter, alias);
+ } else {
+ throw new SeaTunnelRuntimeException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+ "Transform config error! UnSupport function:" +
functionName);
+ }
+ }
+
+ return seaTunnelRows;
+ }
+
+ private List<SeaTunnelRow> explode(
+ List<SeaTunnelRow> seaTunnelRows,
+ Function lateralViewFunction,
+ SeaTunnelRowType outRowType,
+ boolean isUsingOuter,
+ String alias) {
+ ExpressionList<?> expressions = lateralViewFunction.getParameters();
+ int aliasFieldIndex = outRowType.indexOf(alias);
+ for (Expression expression : expressions) {
+ if (expression instanceof Column) {
+ String column = ((Column) expression).getColumnName();
+ List<SeaTunnelRow> next = new ArrayList<>();
+ for (SeaTunnelRow row : seaTunnelRows) {
+ int fieldIndex = outRowType.indexOf(column);
+ Object splitFieldValue = row.getField(fieldIndex);
+ transformExplodeValue(
+ splitFieldValue,
+ outRowType,
+ isUsingOuter,
+ next,
+ aliasFieldIndex,
+ row,
+ expression,
+ true);
+ }
+ seaTunnelRows = next;
+ } else if (expression instanceof Function) {
+ List<SeaTunnelRow> next = new ArrayList<>();
+ for (SeaTunnelRow row : seaTunnelRows) {
+ Object splitFieldValue = computeForValue(expression,
row.getFields());
+ transformExplodeValue(
+ splitFieldValue,
+ outRowType,
+ isUsingOuter,
+ next,
+ aliasFieldIndex,
+ row,
+ expression,
+ false);
+ }
+ seaTunnelRows = next;
+ }
+ }
+ return seaTunnelRows;
+ }
+
+ private void transformExplodeValue(
+ Object splitFieldValue,
+ SeaTunnelRowType outRowType,
+ boolean isUsingOuter,
+ List<SeaTunnelRow> next,
+ int aliasFieldIndex,
+ SeaTunnelRow row,
+ Expression expression,
+ boolean keepValueType) {
+ if (splitFieldValue == null) {
+ if (isUsingOuter) {
+ next.add(copySeaTunnelRow(outRowType.getTotalFields(), row,
aliasFieldIndex, null));
+ }
+ return;
+ }
+ if (splitFieldValue.getClass().isArray()) {
+ if (ArrayUtils.isEmpty((Object[]) splitFieldValue)) {
+ if (isUsingOuter) {
+ next.add(
+ copySeaTunnelRow(
+ outRowType.getTotalFields(), row,
aliasFieldIndex, null));
+ }
+ return;
+ }
+ for (Object fieldValue : (Object[]) splitFieldValue) {
+ Object value = keepValueType ? fieldValue :
String.valueOf(fieldValue);
+ next.add(
+ copySeaTunnelRow(outRowType.getTotalFields(), row,
aliasFieldIndex, value));
+ }
+ } else {
+ throw new SeaTunnelRuntimeException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+ "Transform config error! UnSupport explode function:"
+ + ((Function) expression).getName());
+ }
+ }
+
+ private SeaTunnelRow copySeaTunnelRow(
Review Comment:
```suggestion
private SeaTunnelRow copySeaTunnelRowWithNewValue(
```
##########
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java:
##########
@@ -665,4 +699,180 @@ private Object executeBinaryExpr(BinaryExpression
binaryExpression, Object[] inp
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
String.format("Unsupported SQL Expression: %s ",
binaryExpression));
}
+
+ public List<SeaTunnelRow> lateralView(
+ List<SeaTunnelRow> seaTunnelRows,
+ List<LateralView> lateralViews,
+ SeaTunnelRowType outRowType) {
+ for (LateralView lateralView : lateralViews) {
+ Function function = lateralView.getGeneratorFunction();
+ boolean isUsingOuter = lateralView.isUsingOuter();
+ String functionName = function.getName();
+ String alias = lateralView.getColumnAlias().getName();
Review Comment:
What if the sql function without `as`:
```
SELECT * FROM fake
LATERAL VIEW EXPLODE ( SPLIT ( NAME, ',' ) )
```
The result would be?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]