[
https://issues.apache.org/jira/browse/BEAM-5852?focusedWorklogId=164104&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-164104
]
ASF GitHub Bot logged work on BEAM-5852:
----------------------------------------
Author: ASF GitHub Bot
Created on: 08/Nov/18 22:01
Start Date: 08/Nov/18 22:01
Worklog Time Spent: 10m
Work Description: akedin closed pull request #6928: [BEAM-5852] BeamSQL
functions
URL: https://github.com/apache/beam/pull/6928
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
index 3e442fe7c11..c8c1cf615b6 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
@@ -132,6 +132,7 @@ public boolean accept() {
case CHAR:
case VARCHAR:
return value instanceof String || value instanceof NlsString;
+ case BINARY:
case VARBINARY:
return value instanceof byte[] || value instanceof ByteString;
case TIME:
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java
new file mode 100644
index 00000000000..ce27a02bc53
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.udf;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.auto.service.AutoService;
+import java.util.Arrays;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+/** BuiltinStringFunctions. */
+@AutoService(BeamBuiltinFunctionProvider.class)
+public class BuiltinStringFunctions extends BeamBuiltinFunctionProvider {
+
+ // return a explicitly null for Boolean has NP_BOOLEAN_RETURN_NULL warning.
+ // return null for boolean is not allowed.
+ // TODO: handle null input.
+ @UDF(
+ funcName = "ENDS_WITH",
+ parameterArray = {TypeName.STRING},
+ returnType = TypeName.STRING
+ )
+ public Boolean endsWith(String str1, String str2) {
+ return str1.endsWith(str2);
+ }
+
+ // return a explicitly null for Boolean has NP_BOOLEAN_RETURN_NULL warning.
+ // return null for boolean is not allowed.
+ // TODO: handle null input.
+ @UDF(
+ funcName = "STARTS_WITH",
+ parameterArray = {TypeName.STRING},
+ returnType = TypeName.STRING
+ )
+ public Boolean startsWith(String str1, String str2) {
+ return str1.startsWith(str2);
+ }
+
+ @UDF(
+ funcName = "LENGTH",
+ parameterArray = {TypeName.STRING},
+ returnType = TypeName.INT64
+ )
+ public Long length(String str) {
+ if (str == null) {
+ return null;
+ }
+ return (long) str.length();
+ }
+
+ @UDF(
+ funcName = "LENGTH",
+ parameterArray = {TypeName.BYTES},
+ returnType = TypeName.INT64
+ )
+ public Long length(byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+ return (long) bytes.length;
+ }
+
+ @UDF(
+ funcName = "REVERSE",
+ parameterArray = {TypeName.STRING},
+ returnType = TypeName.STRING
+ )
+ public String reverse(String str) {
+ if (str == null) {
+ return null;
+ }
+ return new StringBuilder(str).reverse().toString();
+ }
+
+ @UDF(
+ funcName = "REVERSE",
+ parameterArray = {TypeName.BYTES},
+ returnType = TypeName.BYTES
+ )
+ public byte[] reverse(byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+ byte[] ret = Arrays.copyOf(bytes, bytes.length);
+ ArrayUtils.reverse(ret);
+ return ret;
+ }
+
+ @UDF(
+ funcName = "FROM_HEX",
+ parameterArray = {TypeName.STRING},
+ returnType = TypeName.BYTES
+ )
+ public byte[] fromHex(String str) throws DecoderException {
+ if (str == null) {
+ return null;
+ }
+
+ return Hex.decodeHex(str.toCharArray());
+ }
+
+ @UDF(
+ funcName = "TO_HEX",
+ parameterArray = {TypeName.BYTES},
+ returnType = TypeName.STRING
+ )
+ public String toHex(byte[] bytes) throws DecoderException {
+ if (bytes == null) {
+ return null;
+ }
+
+ return Hex.encodeHexString(bytes);
+ }
+
+ @UDF(
+ funcName = "LPAD",
+ parameterArray = {TypeName.STRING, TypeName.INT64},
+ returnType = TypeName.STRING
+ )
+ public String lpad(String originalValue, Long returnLength) {
+ return lpad(originalValue, returnLength, " ");
+ }
+
+ @UDF(
+ funcName = "LPAD",
+ parameterArray = {TypeName.STRING, TypeName.INT64, TypeName.STRING},
+ returnType = TypeName.STRING
+ )
+ public String lpad(String originalValue, Long returnLength, String pattern) {
+ if (originalValue == null || returnLength == null || pattern == null) {
+ return null;
+ }
+
+ if (returnLength < -1 || pattern.isEmpty()) {
+ throw new IllegalArgumentException("returnLength cannot be 0 or pattern
cannot be empty.");
+ }
+
+ if (originalValue.length() == returnLength) {
+ return originalValue;
+ } else if (originalValue.length() < returnLength) { // add padding to left
+ return StringUtils.leftPad(originalValue, Math.toIntExact(returnLength),
pattern);
+ } else { // truncating string by str.substring
+ // Java String can only hold a string with Integer.MAX_VALUE as longest
length.
+ return originalValue.substring(0, Math.toIntExact(returnLength));
+ }
+ }
+
+ @UDF(
+ funcName = "LPAD",
+ parameterArray = {TypeName.BYTES, TypeName.INT64},
+ returnType = TypeName.BYTES
+ )
+ public byte[] lpad(byte[] originalValue, Long returnLength) {
+ return lpad(originalValue, returnLength, " ".getBytes(UTF_8));
+ }
+
+ @UDF(
+ funcName = "LPAD",
+ parameterArray = {TypeName.BYTES, TypeName.INT64, TypeName.BYTES},
+ returnType = TypeName.BYTES
+ )
+ public byte[] lpad(byte[] originalValue, Long returnLength, byte[] pattern) {
+ if (originalValue == null || returnLength == null || pattern == null) {
+ return null;
+ }
+ if (returnLength < -1 || pattern.length == 0) {
+ throw new IllegalArgumentException("returnLength cannot be 0 or pattern
cannot be empty.");
+ }
+
+ int returnLengthInt = Math.toIntExact(returnLength);
+
+ if (originalValue.length == returnLengthInt) {
+ return originalValue;
+ } else if (originalValue.length < returnLengthInt) { // add padding to left
+ byte[] ret = new byte[returnLengthInt];
+ // step one: pad #(returnLengthInt - originalValue.length) bytes to left
side.
+ int paddingOff = 0;
+ int paddingLeftBytes = returnLengthInt - originalValue.length;
+ byteArrayPadding(ret, pattern, paddingOff, paddingLeftBytes);
+
+ // step two: copy originalValue.
+ System.arraycopy(
+ originalValue, 0, ret, returnLengthInt - originalValue.length,
originalValue.length);
+ return ret;
+ } else { // truncating string by str.substring
+ // Java String can only hold a string with Integer.MAX_VALUE as longest
length.
+ byte[] ret = new byte[returnLengthInt];
+ System.arraycopy(originalValue, 0, ret, 0, returnLengthInt);
+ return ret;
+ }
+ }
+
+ @UDF(
+ funcName = "RPAD",
+ parameterArray = {TypeName.STRING, TypeName.INT64},
+ returnType = TypeName.STRING
+ )
+ public String rpad(String originalValue, Long returnLength) {
+ return lpad(originalValue, returnLength, " ");
+ }
+
+ @UDF(
+ funcName = "RPAD",
+ parameterArray = {TypeName.STRING, TypeName.INT64, TypeName.STRING},
+ returnType = TypeName.STRING
+ )
+ public String rpad(String originalValue, Long returnLength, String pattern) {
+ if (originalValue == null || returnLength == null || pattern == null) {
+ return null;
+ }
+
+ if (returnLength < -1 || pattern.isEmpty()) {
+ throw new IllegalArgumentException("returnLength cannot be 0 or pattern
cannot be empty.");
+ }
+
+ if (originalValue.length() == returnLength) {
+ return originalValue;
+ } else if (originalValue.length() < returnLength) { // add padding to right
+ return StringUtils.rightPad(originalValue,
Math.toIntExact(returnLength), pattern);
+ } else { // truncating string by str.substring
+ // Java String can only hold a string with Integer.MAX_VALUE as longest
length.
+ return originalValue.substring(0, Math.toIntExact(returnLength));
+ }
+ }
+
+ @UDF(
+ funcName = "RPAD",
+ parameterArray = {TypeName.BYTES, TypeName.INT64},
+ returnType = TypeName.BYTES
+ )
+ public byte[] rpad(byte[] originalValue, Long returnLength) {
+ return lpad(originalValue, returnLength, " ".getBytes(UTF_8));
+ }
+
+ @UDF(
+ funcName = "RPAD",
+ parameterArray = {TypeName.BYTES, TypeName.INT64, TypeName.BYTES},
+ returnType = TypeName.BYTES
+ )
+ public byte[] rpad(byte[] originalValue, Long returnLength, byte[] pattern) {
+ if (originalValue == null || returnLength == null || pattern == null) {
+ return null;
+ }
+ if (returnLength < -1 || pattern.length == 0) {
+ throw new IllegalArgumentException("returnLength cannot be 0 or pattern
cannot be empty.");
+ }
+
+ int returnLengthInt = Math.toIntExact(returnLength);
+
+ if (originalValue.length == returnLengthInt) {
+ return originalValue;
+ } else if (originalValue.length < returnLengthInt) { // add padding to
right
+ byte[] ret = new byte[returnLengthInt];
+ // step one: copy originalValue.
+ System.arraycopy(originalValue, 0, ret, 0, originalValue.length);
+
+ // step one: pad #(returnLengthInt - originalValue.length) bytes to
right side.
+ int paddingOff = originalValue.length;
+ int paddingLeftBytes = returnLengthInt - originalValue.length;
+ byteArrayPadding(ret, pattern, paddingOff, paddingLeftBytes);
+ return ret;
+ } else { // truncating string by str.substring
+ // Java String can only hold a string with Integer.MAX_VALUE as longest
length.
+ byte[] ret = new byte[returnLengthInt];
+ System.arraycopy(originalValue, 0, ret, 0, returnLengthInt);
+ return ret;
+ }
+ }
+
+ private void byteArrayPadding(byte[] dest, byte[] pattern, int paddingOff,
int paddingLeftBytes) {
+ while (paddingLeftBytes > 0) {
+ if (paddingLeftBytes >= pattern.length) {
+ // pad the whole pattern
+ System.arraycopy(pattern, 0, dest, paddingOff, pattern.length);
+ paddingLeftBytes -= pattern.length;
+ paddingOff += pattern.length;
+ } else {
+ System.arraycopy(pattern, 0, dest, paddingOff, paddingLeftBytes);
+ paddingLeftBytes = 0;
+ }
+ }
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java
index 83ee0574ef5..9049284d4db 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java
@@ -36,6 +36,9 @@
returnType = Schema.TypeName.DOUBLE
)
public Double cosh(Double o) {
+ if (o == null) {
+ return null;
+ }
return Math.cosh(o);
}
@@ -51,6 +54,9 @@ public Double cosh(Double o) {
returnType = Schema.TypeName.DOUBLE
)
public Double sinh(Double o) {
+ if (o == null) {
+ return null;
+ }
return Math.sinh(o);
}
@@ -65,6 +71,9 @@ public Double sinh(Double o) {
returnType = Schema.TypeName.DOUBLE
)
public Double tanh(Double o) {
+ if (o == null) {
+ return null;
+ }
return Math.tanh(o);
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index 96ad251b36b..f5c89a29573 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -26,6 +26,7 @@
import java.util.stream.IntStream;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.calcite.avatica.util.ByteString;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
@@ -204,6 +205,8 @@ public static RelDataType
sqlTypeWithAutoCast(RelDataTypeFactory typeFactory, Ty
//For Joda time types, return SQL type for java.util.Date.
if (rawType instanceof Class &&
ReadableInstant.class.isAssignableFrom((Class<?>) rawType)) {
return typeFactory.createJavaType(Date.class);
+ } else if (rawType instanceof Class &&
ByteString.class.isAssignableFrom((Class<?>) rawType)) {
+ return typeFactory.createJavaType(byte[].class);
}
return typeFactory.createJavaType((Class) rawType);
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
index 97fce9247b1..6965b50b173 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
@@ -17,10 +17,13 @@
*/
package org.apache.beam.sdk.extensions.sql;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import java.math.BigDecimal;
import java.text.ParseException;
import java.util.List;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Create;
@@ -52,13 +55,20 @@
static Schema schemaInTableA;
static Schema schemaFloatDouble;
+ static Schema schemaBytes;
+ static Schema schemaBytesPaddingTest;
+
static List<Row> rowsInTableA;
static List<Row> rowsOfFloatDouble;
+ static List<Row> rowsOfBytes;
+ static List<Row> rowsOfBytesPaddingTest;
//bounded PCollections
protected PCollection<Row> boundedInput1;
protected PCollection<Row> boundedInput2;
protected PCollection<Row> boundedInputFloatDouble;
+ protected PCollection<Row> boundedInputBytes;
+ protected PCollection<Row> boundedInputBytesPaddingTest;
//unbounded PCollections
protected PCollection<Row> unboundedInput1;
@@ -148,6 +158,62 @@ public static void prepareClass() throws ParseException {
Float.NaN,
Double.NaN)
.getRows();
+
+ schemaBytes =
Schema.builder().addStringField("f_func").addByteArrayField("f_bytes").build();
+
+ rowsOfBytes =
+ TestUtils.RowsBuilder.of(schemaBytes)
+ .addRows(
+ "LENGTH",
+ "".getBytes(UTF_8),
+ "LENGTH",
+ "абвгд".getBytes(UTF_8),
+ "LENGTH",
+ "\0\1".getBytes(UTF_8),
+ "TO_HEX",
+ "foobar".getBytes(UTF_8),
+ "TO_HEX",
+ " ".getBytes(UTF_8),
+ "TO_HEX",
+ "abcABC".getBytes(UTF_8),
+ "TO_HEX",
+ "abcABCжщфЖЩФ".getBytes(UTF_8))
+ .getRows();
+
+ schemaBytesPaddingTest =
+ Schema.builder()
+ .addNullableField("f_bytes_one", FieldType.BYTES)
+ .addNullableField("length", FieldType.INT64)
+ .addNullableField("f_bytes_two", FieldType.BYTES)
+ .build();
+ rowsOfBytesPaddingTest =
+ TestUtils.RowsBuilder.of(schemaBytesPaddingTest)
+ .addRows(
+ "abcdef".getBytes(UTF_8),
+ 0L,
+ "defgh".getBytes(UTF_8),
+ "abcdef".getBytes(UTF_8),
+ 6L,
+ "defgh".getBytes(UTF_8),
+ "abcdef".getBytes(UTF_8),
+ 4L,
+ "defgh".getBytes(UTF_8),
+ "abcdef".getBytes(UTF_8),
+ 10L,
+ "defgh".getBytes(UTF_8),
+ "abc".getBytes(UTF_8),
+ 10L,
+ "defgh".getBytes(UTF_8),
+ "abc".getBytes(UTF_8),
+ 7L,
+ "-".getBytes(UTF_8),
+ "".getBytes(UTF_8),
+ 7L,
+ "def".getBytes(UTF_8),
+ null,
+ null,
+ null)
+ .getRows();
}
@Before
@@ -179,6 +245,24 @@ public void preparePCollections() {
SerializableFunctions.identity(),
SerializableFunctions.identity()));
+ boundedInputBytes =
+ pipeline.apply(
+ "boundedInputBytes",
+ Create.of(rowsOfBytes)
+ .withSchema(
+ schemaBytes,
+ SerializableFunctions.identity(),
+ SerializableFunctions.identity()));
+
+ boundedInputBytesPaddingTest =
+ pipeline.apply(
+ "boundedInputBytesPaddingTest",
+ Create.of(rowsOfBytesPaddingTest)
+ .withSchema(
+ schemaBytesPaddingTest,
+ SerializableFunctions.identity(),
+ SerializableFunctions.identity()));
+
unboundedInput1 = prepareUnboundedPCollection1();
unboundedInput2 = prepareUnboundedPCollection2();
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSalUhfSpecialTypeAndValueTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSalUhfSpecialTypeAndValueTest.java
new file mode 100644
index 00000000000..ad59c8e2ff9
--- /dev/null
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSalUhfSpecialTypeAndValueTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.udf;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.beam.sdk.extensions.sql.BeamSqlDslBase;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.lang3.ArrayUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for UDFs. */
+@RunWith(JUnit4.class)
+public class BeamSalUhfSpecialTypeAndValueTest extends BeamSqlDslBase {
+
+ @Test
+ public void testIsInf() throws Exception {
+ Schema resultType =
+ Schema.builder()
+ .addBooleanField("field_1")
+ .addBooleanField("field_2")
+ .addBooleanField("field_3")
+ .addBooleanField("field_4")
+ .build();
+ Row resultRow = Row.withSchema(resultType).addValues(true, true, true,
true).build();
+
+ String sql =
+ "SELECT IS_INF(f_float_1), IS_INF(f_double_1), IS_INF(f_float_2),
IS_INF(f_double_2) FROM PCOLLECTION";
+ PCollection<Row> result = boundedInputFloatDouble.apply("testUdf",
SqlTransform.query(sql));
+ PAssert.that(result).containsInAnyOrder(resultRow);
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testIsNan() throws Exception {
+ Schema resultType =
+ Schema.builder()
+ .addBooleanField("field_1")
+ .addBooleanField("field_2")
+ .addBooleanField("field_3")
+ .addBooleanField("field_4")
+ .build();
+ Row resultRow = Row.withSchema(resultType).addValues(false, false, true,
true).build();
+
+ String sql =
+ "SELECT IS_NAN(f_float_2), IS_NAN(f_double_2), IS_NAN(f_float_3),
IS_NAN(f_double_3) FROM PCOLLECTION";
+ PCollection<Row> result = boundedInputFloatDouble.apply("testUdf",
SqlTransform.query(sql));
+ PAssert.that(result).containsInAnyOrder(resultRow);
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testLength() throws Exception {
+ Schema resultType = Schema.builder().addInt64Field("field").build();
+ Row resultRow = Row.withSchema(resultType).addValues(10L).build();
+ Row resultRow2 = Row.withSchema(resultType).addValues(0L).build();
+ Row resultRow3 = Row.withSchema(resultType).addValues(2L).build();
+ String sql = "SELECT LENGTH(f_bytes) FROM PCOLLECTION WHERE f_func =
'LENGTH'";
+ PCollection<Row> result = boundedInputBytes.apply("testUdf",
SqlTransform.query(sql));
+ PAssert.that(result).containsInAnyOrder(resultRow, resultRow2, resultRow3);
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testReverse() throws Exception {
+ byte[] testByets = "абвгд".getBytes(UTF_8);
+ ArrayUtils.reverse(testByets);
+ Schema resultType = Schema.builder().addByteArrayField("field").build();
+ Row resultRow = Row.withSchema(resultType).addValues(testByets).build();
+ Row resultRow2 =
Row.withSchema(resultType).addValues("\1\0".getBytes(UTF_8)).build();
+ Row resultRow3 =
Row.withSchema(resultType).addValues("".getBytes(UTF_8)).build();
+ String sql = "SELECT REVERSE(f_bytes) FROM PCOLLECTION WHERE f_func =
'LENGTH'";
+ PCollection<Row> result = boundedInputBytes.apply("testUdf",
SqlTransform.query(sql));
+ PAssert.that(result).containsInAnyOrder(resultRow, resultRow2, resultRow3);
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testToHex() throws Exception {
+ Schema resultType = Schema.builder().addStringField("field").build();
+ Row resultRow =
Row.withSchema(resultType).addValue("666f6f626172").build();
+ Row resultRow2 = Row.withSchema(resultType).addValue("20").build();
+ Row resultRow3 =
Row.withSchema(resultType).addValue("616263414243").build();
+ Row resultRow4 =
+
Row.withSchema(resultType).addValue("616263414243d0b6d189d184d096d0a9d0a4").build();
+
+ String sql = "SELECT TO_HEX(f_bytes) FROM PCOLLECTION WHERE f_func =
'TO_HEX'";
+ PCollection<Row> result = boundedInputBytes.apply("testUdf",
SqlTransform.query(sql));
+ PAssert.that(result).containsInAnyOrder(resultRow, resultRow2, resultRow3,
resultRow4);
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testLeftPad() throws Exception {
+ Schema resultType = Schema.builder().addNullableField("field",
FieldType.BYTES).build();
+ Row resultRow =
Row.withSchema(resultType).addValue("".getBytes(UTF_8)).build();
+ Row resultRow2 =
Row.withSchema(resultType).addValue("abcdef".getBytes(UTF_8)).build();
+ Row resultRow3 =
Row.withSchema(resultType).addValue("abcd".getBytes(UTF_8)).build();
+ Row resultRow4 =
Row.withSchema(resultType).addValue("defgabcdef".getBytes(UTF_8)).build();
+ Row resultRow5 =
Row.withSchema(resultType).addValue("defghdeabc".getBytes(UTF_8)).build();
+ Row resultRow6 =
Row.withSchema(resultType).addValue("----abc".getBytes(UTF_8)).build();
+ Row resultRow7 =
Row.withSchema(resultType).addValue("defdefd".getBytes(UTF_8)).build();
+ Row resultRow8 = Row.withSchema(resultType).addValue(null).build();
+
+ String sql = "SELECT LPAD(f_bytes_one, length, f_bytes_two) FROM
PCOLLECTION";
+ PCollection<Row> result =
+ boundedInputBytesPaddingTest.apply("testUdf", SqlTransform.query(sql));
+ PAssert.that(result)
+ .containsInAnyOrder(
+ resultRow,
+ resultRow2,
+ resultRow3,
+ resultRow4,
+ resultRow5,
+ resultRow6,
+ resultRow7,
+ resultRow8);
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testRightPad() throws Exception {
+ Schema resultType = Schema.builder().addNullableField("field",
FieldType.BYTES).build();
+ Row resultRow =
Row.withSchema(resultType).addValue("".getBytes(UTF_8)).build();
+ Row resultRow2 =
Row.withSchema(resultType).addValue("abcdef".getBytes(UTF_8)).build();
+ Row resultRow3 =
Row.withSchema(resultType).addValue("abcd".getBytes(UTF_8)).build();
+ Row resultRow4 =
Row.withSchema(resultType).addValue("abcdefdefg".getBytes(UTF_8)).build();
+ Row resultRow5 =
Row.withSchema(resultType).addValue("abcdefghde".getBytes(UTF_8)).build();
+ Row resultRow6 =
Row.withSchema(resultType).addValue("abc----".getBytes(UTF_8)).build();
+ Row resultRow7 =
Row.withSchema(resultType).addValue("defdefd".getBytes(UTF_8)).build();
+ Row resultRow8 = Row.withSchema(resultType).addValue(null).build();
+
+ String sql = "SELECT RPAD(f_bytes_one, length, f_bytes_two) FROM
PCOLLECTION";
+ PCollection<Row> result =
+ boundedInputBytesPaddingTest.apply("testUdf", SqlTransform.query(sql));
+ PAssert.that(result)
+ .containsInAnyOrder(
+ resultRow,
+ resultRow2,
+ resultRow3,
+ resultRow4,
+ resultRow5,
+ resultRow6,
+ resultRow7,
+ resultRow8);
+ pipeline.run().waitUntilFinish();
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSqlUdfExpressionTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSqlUdfExpressionTest.java
new file mode 100644
index 00000000000..82ffccf9fb6
--- /dev/null
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSqlUdfExpressionTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.udf;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import
org.apache.beam.sdk.extensions.sql.integrationtest.BeamSqlBuiltinFunctionsIntegrationTestBase;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for UDFs. */
+@RunWith(JUnit4.class)
+public class BeamSqlUdfExpressionTest extends
BeamSqlBuiltinFunctionsIntegrationTestBase {
+
+ @Test
+ public void testCOSH() throws Exception {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("COSH(CAST(1.0 as DOUBLE))", Math.cosh(1.0))
+ .addExpr("COSH(CAST(710.0 as DOUBLE))", Math.cosh(710.0))
+ .addExpr("COSH(CAST(-1.0 as DOUBLE))", Math.cosh(-1.0))
+ .addExprWithNullExpectedValue("COSH(CAST(NULL as DOUBLE))",
TypeName.DOUBLE);
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testSINH() throws Exception {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("SINH(CAST(1.0 as DOUBLE))", Math.sinh(1.0))
+ .addExpr("SINH(CAST(710.0 as DOUBLE))", Math.sinh(710.0))
+ .addExpr("SINH(CAST(-1.0 as DOUBLE))", Math.sinh(-1.0))
+ .addExprWithNullExpectedValue("SINH(CAST(NULL as DOUBLE))",
TypeName.DOUBLE);
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testTANH() throws Exception {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("TANH(CAST(1.0 as DOUBLE))", Math.tanh(1.0))
+ .addExpr("TANH(CAST(0.0 as DOUBLE))", Math.tanh(0.0))
+ .addExpr("TANH(CAST(-1.0 as DOUBLE))", Math.tanh(-1.0))
+ .addExprWithNullExpectedValue("TANH(CAST(NULL as DOUBLE))",
TypeName.DOUBLE);
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testEndsWith() throws Exception {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("ENDS_WITH('string1', 'g1')", true)
+ .addExpr("ENDS_WITH('string2', 'g1')", false)
+ .addExpr("ENDS_WITH('', '')", true)
+ .addExpr("ENDS_WITH('中文', '文')", true)
+ .addExpr("ENDS_WITH('中文', '中')", false);
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testStartsWith() throws Exception {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("STARTS_WITH('string1', 'stri')", true)
+ .addExpr("STARTS_WITH('string2', 'str1')", false)
+ .addExpr("STARTS_WITH('', '')", true)
+ .addExpr("STARTS_WITH('中文', '文')", false)
+ .addExpr("STARTS_WITH('中文', '中')", true);
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testLength() throws Exception {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("LENGTH('')", 0L)
+ .addExpr("LENGTH('abcde')", 5L)
+ .addExpr("LENGTH('中文')", 2L)
+ .addExpr("LENGTH('\0\0')", 2L)
+ .addExpr("LENGTH('абвгд')", 5L)
+ .addExprWithNullExpectedValue("LENGTH(CAST(NULL as VARCHAR(0)))",
TypeName.INT64)
+ .addExprWithNullExpectedValue("LENGTH(CAST(NULL as
VARBINARY(0)))", TypeName.INT64);
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testReverse() throws Exception {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("REVERSE('')", "")
+ .addExpr("REVERSE('foo')", "oof")
+ .addExpr("REVERSE('中文')", "文中")
+ .addExpr("REVERSE('абвгд')", "дгвба")
+ .addExprWithNullExpectedValue("REVERSE(CAST(NULL as VARCHAR(0)))",
TypeName.STRING)
+ .addExprWithNullExpectedValue("REVERSE(CAST(NULL as
VARBINARY(0)))", TypeName.STRING);
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testFromHex() throws Exception {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("FROM_HEX('666f6f626172')", "foobar".getBytes(UTF_8))
+ .addExpr("FROM_HEX('20')", " ".getBytes(UTF_8))
+ .addExpr("FROM_HEX('616263414243')", "abcABC".getBytes(UTF_8))
+ .addExpr(
+ "FROM_HEX('616263414243d0b6d189d184d096d0a9d0a4')",
"abcABCжщфЖЩФ".getBytes(UTF_8))
+ .addExprWithNullExpectedValue("FROM_HEX(CAST(NULL as
VARCHAR(0)))", TypeName.BYTES);
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testToHex() throws Exception {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExprWithNullExpectedValue("TO_HEX(CAST(NULL as
VARBINARY(0)))", TypeName.STRING);
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testLeftPad() throws Exception {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("LPAD('abcdef', CAST(0 AS BIGINT))", "")
+ .addExpr("LPAD('abcdef', CAST(0 AS BIGINT), 'defgh')", "")
+ .addExpr("LPAD('abcdef', CAST(6 AS BIGINT), 'defgh')", "abcdef")
+ .addExpr("LPAD('abcdef', CAST(5 AS BIGINT), 'defgh')", "abcde")
+ .addExpr("LPAD('abcdef', CAST(4 AS BIGINT), 'defgh')", "abcd")
+ .addExpr("LPAD('abcdef', CAST(3 AS BIGINT), 'defgh')", "abc")
+ .addExpr("LPAD('abc', CAST(4 AS BIGINT), 'defg')", "dabc")
+ .addExpr("LPAD('abc', CAST(5 AS BIGINT), 'defgh')", "deabc")
+ .addExpr("LPAD('abc', CAST(6 AS BIGINT), 'defgh')", "defabc")
+ .addExpr("LPAD('abc', CAST(7 AS BIGINT), 'defg')", "defgabc")
+ .addExpr("LPAD('abcd', CAST(10 AS BIGINT), 'defg')", "defgdeabcd")
+ .addExpr("LPAD('中文', CAST(10 AS BIGINT), 'жщфЖЩФ')", "жщфЖЩФжщ中文")
+ .addExpr("LPAD('', CAST(5 AS BIGINT), ' ')", " ")
+ .addExpr("LPAD('', CAST(3 AS BIGINT), '-')", "---")
+ .addExpr("LPAD('a', CAST(5 AS BIGINT), ' ')", " a")
+ .addExpr("LPAD('a', CAST(3 AS BIGINT), '-')", "--a")
+ .addExprWithNullExpectedValue(
+ "LPAD(CAST(NULL AS VARCHAR(0)), CAST(3 AS BIGINT), '-')",
TypeName.STRING)
+ .addExprWithNullExpectedValue("LPAD('', CAST(NULL AS BIGINT),
'-')", TypeName.STRING)
+ .addExprWithNullExpectedValue(
+ "LPAD('', CAST(3 AS BIGINT), CAST(NULL AS VARCHAR(0)))",
TypeName.STRING);
+
+ checker.buildRunAndCheck();
+ }
+
+ @Test
+ public void testRightPad() throws Exception {
+ ExpressionChecker checker =
+ new ExpressionChecker()
+ .addExpr("RPAD('abcdef', CAST(0 AS BIGINT))", "")
+ .addExpr("RPAD('abcdef', CAST(0 AS BIGINT), 'defgh')", "")
+ .addExpr("RPAD('abcdef', CAST(6 AS BIGINT), 'defgh')", "abcdef")
+ .addExpr("RPAD('abcdef', CAST(5 AS BIGINT), 'defgh')", "abcde")
+ .addExpr("RPAD('abcdef', CAST(4 AS BIGINT), 'defgh')", "abcd")
+ .addExpr("RPAD('abcdef', CAST(3 AS BIGINT), 'defgh')", "abc")
+ .addExpr("RPAD('abc', CAST(4 AS BIGINT), 'defg')", "abcd")
+ .addExpr("RPAD('abc', CAST(5 AS BIGINT), 'defgh')", "abcde")
+ .addExpr("RPAD('abc', CAST(6 AS BIGINT), 'defgh')", "abcdef")
+ .addExpr("RPAD('abc', CAST(7 AS BIGINT), 'defg')", "abcdefg")
+ .addExpr("RPAD('abcd', CAST(10 AS BIGINT), 'defg')", "abcddefgde")
+ .addExpr("RPAD('中文', CAST(10 AS BIGINT), 'жщфЖЩФ')", "中文жщфЖЩФжщ")
+ .addExpr("RPAD('', CAST(5 AS BIGINT), ' ')", " ")
+ .addExpr("RPAD('', CAST(3 AS BIGINT), '-')", "---")
+ .addExpr("RPAD('a', CAST(5 AS BIGINT), ' ')", "a ")
+ .addExpr("RPAD('a', CAST(3 AS BIGINT), '-')", "a--")
+ .addExprWithNullExpectedValue(
+ "RPAD(CAST(NULL AS VARCHAR(0)), CAST(3 AS BIGINT), '-')",
TypeName.STRING)
+ .addExprWithNullExpectedValue("RPAD('', CAST(NULL AS BIGINT),
'-')", TypeName.STRING)
+ .addExprWithNullExpectedValue(
+ "RPAD('', CAST(3 AS BIGINT), CAST(NULL AS VARCHAR(0)))",
TypeName.STRING);
+
+ checker.buildRunAndCheck();
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSqlUdfImplTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSqlUdfImplTest.java
deleted file mode 100644
index 753cac8ae7b..00000000000
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSqlUdfImplTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.impl.udf;
-
-import org.apache.beam.sdk.extensions.sql.BeamSqlDslBase;
-import org.apache.beam.sdk.extensions.sql.SqlTransform;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.Row;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for UDFs. */
-@RunWith(JUnit4.class)
-public class BeamSqlUdfImplTest extends BeamSqlDslBase {
-
- @Rule public ExpectedException expectedEx = ExpectedException.none();
-
- @Test
- public void testIsInf() throws Exception {
- Schema resultType =
- Schema.builder()
- .addBooleanField("field_1")
- .addBooleanField("field_2")
- .addBooleanField("field_3")
- .addBooleanField("field_4")
- .build();
- Row resultRow = Row.withSchema(resultType).addValues(true, true, true,
true).build();
-
- String sql =
- "SELECT IS_INF(f_float_1), IS_INF(f_double_1), IS_INF(f_float_2),
IS_INF(f_double_2) FROM PCOLLECTION";
- PCollection<Row> result = boundedInputFloatDouble.apply("testUdf",
SqlTransform.query(sql));
- PAssert.that(result).containsInAnyOrder(resultRow);
- pipeline.run().waitUntilFinish();
- }
-
- @Test
- public void testIsNan() throws Exception {
- Schema resultType =
- Schema.builder()
- .addBooleanField("field_1")
- .addBooleanField("field_2")
- .addBooleanField("field_3")
- .addBooleanField("field_4")
- .build();
- Row resultRow = Row.withSchema(resultType).addValues(false, false, true,
true).build();
-
- String sql =
- "SELECT IS_NAN(f_float_2), IS_NAN(f_double_2), IS_NAN(f_float_3),
IS_NAN(f_double_3) FROM PCOLLECTION";
- PCollection<Row> result = boundedInputFloatDouble.apply("testUdf",
SqlTransform.query(sql));
- PAssert.that(result).containsInAnyOrder(resultRow);
- pipeline.run().waitUntilFinish();
- }
-
- @Test
- public void testCOSH() throws Exception {
- Schema resultType = Schema.builder().addNullableField("field",
Schema.FieldType.DOUBLE).build();
-
- Row resultRow1 =
Row.withSchema(resultType).addValues(Math.cosh(1.0)).build();
- String sql1 = "SELECT COSH(CAST(1.0 as DOUBLE))";
- PCollection<Row> result1 = boundedInput1.apply("testUdf1",
SqlTransform.query(sql1));
- PAssert.that(result1).containsInAnyOrder(resultRow1);
-
- Row resultRow2 =
Row.withSchema(resultType).addValues(Math.cosh(710.0)).build();
- String sql2 = "SELECT COSH(CAST(710.0 as DOUBLE))";
- PCollection<Row> result2 = boundedInput1.apply("testUdf2",
SqlTransform.query(sql2));
- PAssert.that(result2).containsInAnyOrder(resultRow2);
-
- Row resultRow3 =
Row.withSchema(resultType).addValues(Math.cosh(-1.0)).build();
- String sql3 = "SELECT COSH(CAST(-1.0 as DOUBLE))";
- PCollection<Row> result3 = boundedInput1.apply("testUdf3",
SqlTransform.query(sql3));
- PAssert.that(result3).containsInAnyOrder(resultRow3);
-
- pipeline.run().waitUntilFinish();
- }
-
- @Test
- public void testSINH() throws Exception {
- Schema resultType = Schema.builder().addNullableField("field",
Schema.FieldType.DOUBLE).build();
-
- Row resultRow1 =
Row.withSchema(resultType).addValues(Math.sinh(1.0)).build();
- String sql1 = "SELECT SINH(CAST(1.0 as DOUBLE))";
- PCollection<Row> result1 = boundedInput1.apply("testUdf1",
SqlTransform.query(sql1));
- PAssert.that(result1).containsInAnyOrder(resultRow1);
-
- Row resultRow2 =
Row.withSchema(resultType).addValues(Math.sinh(710.0)).build();
- String sql2 = "SELECT SINH(CAST(710.0 as DOUBLE))";
- PCollection<Row> result2 = boundedInput1.apply("testUdf2",
SqlTransform.query(sql2));
- PAssert.that(result2).containsInAnyOrder(resultRow2);
-
- Row resultRow3 =
Row.withSchema(resultType).addValues(Math.sinh(-1.0)).build();
- String sql3 = "SELECT SINH(CAST(-1.0 as DOUBLE))";
- PCollection<Row> result3 = boundedInput1.apply("testUdf3",
SqlTransform.query(sql3));
- PAssert.that(result3).containsInAnyOrder(resultRow3);
-
- pipeline.run().waitUntilFinish();
- }
-
- @Test
- public void testTANH() throws Exception {
- Schema resultType = Schema.builder().addNullableField("field",
Schema.FieldType.DOUBLE).build();
-
- Row resultRow1 =
Row.withSchema(resultType).addValues(Math.tanh(1.0)).build();
- String sql1 = "SELECT TANH(CAST(1.0 as DOUBLE))";
- PCollection<Row> result1 = boundedInput1.apply("testUdf1",
SqlTransform.query(sql1));
- PAssert.that(result1).containsInAnyOrder(resultRow1);
-
- Row resultRow2 =
Row.withSchema(resultType).addValues(Math.tanh(0.0)).build();
- String sql2 = "SELECT TANH(CAST(0.0 as DOUBLE))";
- PCollection<Row> result2 = boundedInput1.apply("testUdf2",
SqlTransform.query(sql2));
- PAssert.that(result2).containsInAnyOrder(resultRow2);
-
- Row resultRow3 =
Row.withSchema(resultType).addValues(Math.tanh(-1.0)).build();
- String sql3 = "SELECT TANH(CAST(-1.0 as DOUBLE))";
- PCollection<Row> result3 = boundedInput1.apply("testUdf3",
SqlTransform.query(sql3));
- PAssert.that(result3).containsInAnyOrder(resultRow3);
-
- pipeline.run().waitUntilFinish();
- }
-}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index 9660295f8a4..37e3b279cbb 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -73,6 +73,7 @@
.put(String.class, TypeName.STRING)
.put(DateTime.class, TypeName.DATETIME)
.put(Boolean.class, TypeName.BOOLEAN)
+ .put(byte[].class, TypeName.BYTES)
.build();
private static final Schema ROW_TYPE =
@@ -243,6 +244,12 @@ public ExpressionChecker addExpr(String expression, Object
expectedValue) {
return this;
}
+ public ExpressionChecker addExprWithNullExpectedValue(
+ String expression, TypeName resultTypeName) {
+ addExpr(expression, null, FieldType.of(resultTypeName));
+ return this;
+ }
+
public ExpressionChecker addExpr(
String expression, Object expectedValue, FieldType resultFieldType) {
exps.add(ExpressionTestCase.of(expression, expectedValue,
resultFieldType));
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 164104)
Time Spent: 5h 20m (was: 5h 10m)
> Function extension in BeamSQL
> -----------------------------
>
> Key: BEAM-5852
> URL: https://issues.apache.org/jira/browse/BEAM-5852
> Project: Beam
> Issue Type: New Feature
> Components: dsl-sql
> Reporter: Rui Wang
> Assignee: Rui Wang
> Priority: Major
> Time Spent: 5h 20m
> Remaining Estimate: 0h
>
> We could add more functions to BeamSQL (as UDFs) to provide rich
> functionalities than standard/Calcite functions.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)