hanyuzheng7 commented on code in PR #22717: URL: https://github.com/apache/flink/pull/22717#discussion_r1223414204
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayConcatFunction.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_CONCAT}. */ +@Internal +public class ArrayConcatFunction extends BuiltInScalarFunction { + private final ArrayData.ElementGetter elementGetter; + + public ArrayConcatFunction(SpecializedFunction.SpecializedContext context) { + super(BuiltInFunctionDefinitions.ARRAY_CONCAT, context); + final DataType dataType = + ((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) + .getElementDataType(); + elementGetter = ArrayData.createElementGetter(dataType.getLogicalType()); + } + + public @Nullable ArrayData eval(ArrayData... arrays) { + if (arrays == null || arrays.length == 0) { + return null; + } + try { Review Comment: fixed it ########## flink-python/docs/reference/pyflink.table/expressions.rst: ########## @@ -231,6 +231,7 @@ advanced type helper functions Expression.array_remove Expression.array_reverse Expression.array_union + Expression.array_concat Review Comment: It has been fixed, thanks. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java: ########## @@ -1407,6 +1409,44 @@ public OutType arrayUnion(InType array) { unresolvedCall(ARRAY_UNION, toExpr(), objectToExpression(array))); } + /** + * Returns an array of the elements in the concat of array1 and array2, without duplicates. + * + * <p>If both of the array are null, the function will return null. + */ + public OutType arrayConcat(InType... arrays) { + arrays = convertToArrays(arrays); + Expression[] args = + Stream.concat( + Stream.of(toExpr()), + Arrays.stream(arrays).map(ApiExpressionUtils::objectToExpression)) + .toArray(Expression[]::new); + return toApiSpecificExpression(unresolvedCall(ARRAY_CONCAT, args)); + } + + private InType[] convertToArrays(InType[] arrays) { Review Comment: Fixed it. Thank you!. Because In Java, the varargs ... treats a single argument as a standalone argument rather than an array of arguments. Therefore, If I pass an argument [[1,2,3]] to a method that accepts InType... arrays, Java will treat it as [1,2,3], not [[1,2,3]]. If I pass array[[1,2,3]] to the Intype... this time arrays[0] is not [1,2,3] it will become 1, and when you run the test cases, if will from array_concat(f0, array[1,2,3]) become to array_concat(f0, 1, 2, 3), but I want to concat f0 and array[1,2,3] but not concat f0 with 1, 2, 3 ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java: ########## @@ -1407,6 +1409,44 @@ public OutType arrayUnion(InType array) { unresolvedCall(ARRAY_UNION, toExpr(), objectToExpression(array))); } + /** + * Returns an array of the elements in the concat of array1 and array2, without duplicates. + * + * <p>If both of the array are null, the function will return null. + */ + public OutType arrayConcat(InType... arrays) { + arrays = convertToArrays(arrays); + Expression[] args = + Stream.concat( + Stream.of(toExpr()), + Arrays.stream(arrays).map(ApiExpressionUtils::objectToExpression)) + .toArray(Expression[]::new); + return toApiSpecificExpression(unresolvedCall(ARRAY_CONCAT, args)); + } + + private InType[] convertToArrays(InType[] arrays) { + if (arrays == null || arrays.length < 1) { + throw new ValidationException("need at least two arrays"); + } + int numberOfNull = 0; + InType notNullArray = null; + for (int i = 0; i < arrays.length; ++i) { + if (arrays[i] == null) { + numberOfNull++; + } else { + notNullArray = arrays[i]; Review Comment: It has been fixed, thanks. ########## docs/data/sql_functions.yml: ########## @@ -646,6 +646,9 @@ collection: - sql: ARRAY_UNION(array1, array2) table: haystack.arrayUnion(array) description: Returns an array of the elements in the union of array1 and array2, without duplicates. If any of the array is null, the function will return null. + - sql: ARRAY_CONCAT(array1, ...) + table: array1.arrayConcat(array1, ...) Review Comment: It has been fixed, thanks. ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayConcatFunction.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_CONCAT}. */ +@Internal +public class ArrayConcatFunction extends BuiltInScalarFunction { + private final ArrayData.ElementGetter elementGetter; + + public ArrayConcatFunction(SpecializedFunction.SpecializedContext context) { + super(BuiltInFunctionDefinitions.ARRAY_CONCAT, context); + final DataType dataType = + ((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) + .getElementDataType(); + elementGetter = ArrayData.createElementGetter(dataType.getLogicalType()); + } + + public @Nullable ArrayData eval(ArrayData... arrays) { + if (arrays == null || arrays.length == 0) { + return null; + } + if (arrays.length == 1) { + return arrays[0]; + } + try { + List<Object> list = new ArrayList<>(); + for (ArrayData array : arrays) { + if (array != null) { + appendElements(array, elementGetter, list); + } + } + return new GenericArrayData(list.toArray()); + } catch (Throwable t) { + throw new FlinkRuntimeException(t); + } + } + + void appendElements(ArrayData array, ArrayData.ElementGetter elementGetter, List<Object> list) + throws Throwable { + for (int i = 0; i < array.size(); ++i) { + final Object element = elementGetter.getElementOrNull(array, i); + list.add(element); Review Comment: @bvarghese1 now the return definition change to The function returns NULL if any input argument is NULL. accounting to the https://cloud.google.com/bigquery/docs/reference/standard-sql/array_functions#array_concat `ARRAY_CONCAT ARRAY_CONCAT(array_expression[, ...]) Description Concatenates one or more arrays with the same element type into a single array. The function returns NULL if any input argument is NULL.` ########## docs/data/sql_functions.yml: ########## @@ -646,6 +646,9 @@ collection: - sql: ARRAY_UNION(array1, array2) table: haystack.arrayUnion(array) description: Returns an array of the elements in the union of array1 and array2, without duplicates. If any of the array is null, the function will return null. + - sql: ARRAY_CONCAT(array1, array2) + table: array1.arrayConcat(array2) + description: Returns an array of the elements in the concat of array1 and array2, allow duplicates. If both of the array are null, the function will return null. Review Comment: It has been fixed, thanks. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java: ########## @@ -1407,6 +1409,44 @@ public OutType arrayUnion(InType array) { unresolvedCall(ARRAY_UNION, toExpr(), objectToExpression(array))); } + /** + * Returns an array of the elements in the concat of array1 and array2, without duplicates. + * + * <p>If both of the array are null, the function will return null. + */ + public OutType arrayConcat(InType... arrays) { + arrays = convertToArrays(arrays); + Expression[] args = + Stream.concat( + Stream.of(toExpr()), + Arrays.stream(arrays).map(ApiExpressionUtils::objectToExpression)) + .toArray(Expression[]::new); + return toApiSpecificExpression(unresolvedCall(ARRAY_CONCAT, args)); + } + + private InType[] convertToArrays(InType[] arrays) { + if (arrays == null || arrays.length < 1) { + throw new ValidationException("need at least two arrays"); + } + int numberOfNull = 0; + InType notNullArray = null; + for (int i = 0; i < arrays.length; ++i) { + if (arrays[i] == null) { + numberOfNull++; + } else { + notNullArray = arrays[i]; Review Comment: If I want concat(f0, array[null, null, 1]), the InType... arrays is null, null, 1, in this situation, I want to transfer null, null, 1 to an array[null, null, null] the notNullArray is use to check whether input is array or only an element ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java: ########## @@ -1407,6 +1409,44 @@ public OutType arrayUnion(InType array) { unresolvedCall(ARRAY_UNION, toExpr(), objectToExpression(array))); } + /** + * Returns an array of the elements in the concat of array1 and array2, without duplicates. + * + * <p>If both of the array are null, the function will return null. + */ + public OutType arrayConcat(InType... arrays) { + arrays = convertToArrays(arrays); + Expression[] args = + Stream.concat( + Stream.of(toExpr()), + Arrays.stream(arrays).map(ApiExpressionUtils::objectToExpression)) + .toArray(Expression[]::new); + return toApiSpecificExpression(unresolvedCall(ARRAY_CONCAT, args)); + } + + private InType[] convertToArrays(InType[] arrays) { + if (arrays == null || arrays.length < 1) { + throw new ValidationException("need at least two arrays"); Review Comment: fixed it. ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayConcatFunction.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_CONCAT}. */ +@Internal +public class ArrayConcatFunction extends BuiltInScalarFunction { + private final ArrayData.ElementGetter elementGetter; + + public ArrayConcatFunction(SpecializedFunction.SpecializedContext context) { + super(BuiltInFunctionDefinitions.ARRAY_CONCAT, context); + final DataType dataType = + ((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) + .getElementDataType(); + elementGetter = ArrayData.createElementGetter(dataType.getLogicalType()); + } + + public @Nullable ArrayData eval(ArrayData... arrays) { + if (arrays == null || arrays.length == 0) { + return null; + } + if (arrays.length == 1) { + return arrays[0]; + } + try { + List<Object> list = new ArrayList<>(); + for (ArrayData array : arrays) { + if (array != null) { + appendElements(array, elementGetter, list); + } + } + return new GenericArrayData(list.toArray()); + } catch (Throwable t) { + throw new FlinkRuntimeException(t); + } + } + + void appendElements(ArrayData array, ArrayData.ElementGetter elementGetter, List<Object> list) + throws Throwable { + for (int i = 0; i < array.size(); ++i) { + final Object element = elementGetter.getElementOrNull(array, i); + list.add(element); Review Comment: @snuyanzin if it exist null in an array, such as array_concat(array[1,2, null], [2,2,null]) -> result[1,2,null,2,2,null], we should add null in the result ########## docs/data/sql_functions.yml: ########## @@ -646,6 +646,9 @@ collection: - sql: ARRAY_UNION(array1, array2) table: haystack.arrayUnion(array) description: Returns an array of the elements in the union of array1 and array2, without duplicates. If any of the array is null, the function will return null. + - sql: ARRAY_CONCAT(array1, array2, tail...) + table: array1.arrayConcat(array1, array2, tail...) + description: Returns an array of the elements in the concat of at least two arrays, allow duplicates. If all of the arrays are null, the function will return null. Review Comment: Fixed it. Thank you! ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayConcatFunction.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_CONCAT}. */ +@Internal +public class ArrayConcatFunction extends BuiltInScalarFunction { + private final ArrayData.ElementGetter elementGetter; + + public ArrayConcatFunction(SpecializedFunction.SpecializedContext context) { + super(BuiltInFunctionDefinitions.ARRAY_CONCAT, context); + final DataType dataType = + ((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) + .getElementDataType(); + elementGetter = ArrayData.createElementGetter(dataType.getLogicalType()); + } + + public @Nullable ArrayData eval(ArrayData... arrays) { + if (arrays == null || arrays.length == 0) { + return null; + } + if (arrays.length == 1) { + return arrays[0]; + } + try { + List<Object> list = new ArrayList<>(); + for (ArrayData array : arrays) { + if (array != null) { + appendElements(array, elementGetter, list); + } + } + return new GenericArrayData(list.toArray()); + } catch (Throwable t) { + throw new FlinkRuntimeException(t); + } + } + + void appendElements(ArrayData array, ArrayData.ElementGetter elementGetter, List<Object> list) + throws Throwable { + for (int i = 0; i < array.size(); ++i) { + final Object element = elementGetter.getElementOrNull(array, i); + list.add(element); Review Comment: This will not happen because, if there is a null case, the system will throw an exception, according to your proposal, I borrowed https://cloud.google.com/bigquery/docs/reference/standard-sql/array_functions#array_concat,array_concat concatenates one or more arrays with the same element type into a single array. The function returns NULL if any input argument is NULL. Because now the function returns NULL if any input argument is NULL, so I add a else block, if the array is null return null. ########## docs/data/sql_functions.yml: ########## @@ -646,6 +646,9 @@ collection: - sql: ARRAY_UNION(array1, array2) table: haystack.arrayUnion(array) description: Returns an array of the elements in the union of array1 and array2, without duplicates. If any of the array is null, the function will return null. + - sql: ARRAY_CONCAT(array1, ...) + table: array1.arrayConcat(array1, ...) + description: Returns an array of the elements in the concat of at least one array. If all of the arrays are null, the function will return null. Review Comment: It has been fixed, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
