[
https://issues.apache.org/jira/browse/BEAM-4365?focusedWorklogId=110452&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110452
]
ASF GitHub Bot logged work on BEAM-4365:
----------------------------------------
Author: ASF GitHub Bot
Created on: 10/Jun/18 02:35
Start Date: 10/Jun/18 02:35
Worklog Time Spent: 10m
Work Description: kennknowles closed pull request #5433: [BEAM-4365] Make
BeamSqlExpression for operators, use it for string operators
URL: https://github.com/apache/beam/pull/5433
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 40d23793852..6dbbef52594 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -29,11 +29,13 @@
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlDotExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
+import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlOperatorExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlUdfExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowEndExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowStartExpression;
+import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.StringOperators;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlDivideExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMinusExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlModExpression;
@@ -91,15 +93,6 @@
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTruncateExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret.BeamSqlReinterpretExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.row.BeamSqlFieldAccessExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlCharLengthExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlConcatExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlInitCapExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlLowerExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlOverlayExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlPositionExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlSubstringExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlTrimExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlUpperExpression;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
@@ -351,32 +344,32 @@ static BeamSqlExpression buildExpression(RexNode rexNode)
{
// string operators
case "||":
- ret = new BeamSqlConcatExpression(subExps);
+ ret = new BeamSqlOperatorExpression(StringOperators.CONCAT, subExps);
break;
case "POSITION":
- ret = new BeamSqlPositionExpression(subExps);
+ ret = new BeamSqlOperatorExpression(StringOperators.POSITION,
subExps);
break;
case "CHAR_LENGTH":
case "CHARACTER_LENGTH":
- ret = new BeamSqlCharLengthExpression(subExps);
+ ret = new BeamSqlOperatorExpression(StringOperators.CHAR_LENGTH,
subExps);
break;
case "UPPER":
- ret = new BeamSqlUpperExpression(subExps);
+ ret = new BeamSqlOperatorExpression(StringOperators.UPPER, subExps);
break;
case "LOWER":
- ret = new BeamSqlLowerExpression(subExps);
+ ret = new BeamSqlOperatorExpression(StringOperators.LOWER, subExps);
break;
case "TRIM":
- ret = new BeamSqlTrimExpression(subExps);
+ ret = new BeamSqlOperatorExpression(StringOperators.TRIM, subExps);
break;
case "SUBSTRING":
- ret = new BeamSqlSubstringExpression(subExps);
+ ret = new BeamSqlOperatorExpression(StringOperators.SUBSTRING,
subExps);
break;
case "OVERLAY":
- ret = new BeamSqlOverlayExpression(subExps);
+ ret = new BeamSqlOperatorExpression(StringOperators.OVERLAY,
subExps);
break;
case "INITCAP":
- ret = new BeamSqlInitCapExpression(subExps);
+ ret = new BeamSqlOperatorExpression(StringOperators.INIT_CAP,
subExps);
break;
// date functions
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlBinaryOperator.java
similarity index 51%
rename from
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
rename to
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlBinaryOperator.java
index 0e94463b1d5..dc6dab5f0a5 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlBinaryOperator.java
@@ -15,27 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+import static com.google.common.base.Preconditions.checkArgument;
-import com.google.common.collect.ImmutableMap;
import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
-/** 'UPPER' operator. */
-public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression {
- public BeamSqlUpperExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.VARCHAR);
+/** An operator that is applied to already-evaluated arguments. */
+public interface BeamSqlBinaryOperator extends BeamSqlOperator {
+ default BeamSqlPrimitive apply(List<BeamSqlPrimitive> arguments) {
+ checkArgument(arguments.size() == 2, "Unary operator %s received more than
one argument", this);
+ return apply(arguments.get(0), arguments.get(1));
}
- @Override
- public BeamSqlPrimitive evaluate(
- Row inputRow, BoundedWindow window, ImmutableMap<Integer, Object>
correlateEnv) {
- String str = opValueEvaluated(0, inputRow, window, correlateEnv);
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase());
+ default boolean accept(List<BeamSqlExpression> arguments) {
+ return arguments.size() == 2 && accept(arguments.get(0), arguments.get(1));
}
+
+ boolean accept(BeamSqlExpression left, BeamSqlExpression right);
+
+ BeamSqlPrimitive apply(BeamSqlPrimitive left, BeamSqlPrimitive right);
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/package-info.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlOperator.java
similarity index 69%
rename from
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/package-info.java
rename to
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlOperator.java
index e75c8ff9818..d380b1d8403 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/package-info.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlOperator.java
@@ -15,6 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
-/** String operators. */
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/** An operator that is applied to already-evaluated arguments. */
+public interface BeamSqlOperator extends Serializable {
+ boolean accept(List<BeamSqlExpression> arguments);
+
+ SqlTypeName getOutputType();
+
+ BeamSqlPrimitive apply(List<BeamSqlPrimitive> arguments);
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlOperatorExpression.java
similarity index 62%
rename from
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
rename to
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlOperatorExpression.java
index 0abb843ed91..606bde15c16 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlOperatorExpression.java
@@ -15,27 +15,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import com.google.common.collect.ImmutableMap;
import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import java.util.stream.Collectors;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
-/** 'CHAR_LENGTH' operator. */
-public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression {
- public BeamSqlCharLengthExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.INTEGER);
+/** A generic expression form for an operator applied to arguments. */
+public class BeamSqlOperatorExpression extends BeamSqlExpression {
+
+ private final BeamSqlOperator operator;
+
+ public BeamSqlOperatorExpression(BeamSqlOperator operator,
List<BeamSqlExpression> operands) {
+ super(operands, operator.getOutputType());
+ this.operator = operator;
+ }
+
+ @Override
+ public boolean accept() {
+ return operator.accept(operands);
}
@Override
public BeamSqlPrimitive evaluate(
Row inputRow, BoundedWindow window, ImmutableMap<Integer, Object>
correlateEnv) {
- String str = opValueEvaluated(0, inputRow, window, correlateEnv);
- return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length());
+ List<BeamSqlPrimitive> arguments =
+ operands
+ .stream()
+ .map(operand -> operand.evaluate(inputRow, window, correlateEnv))
+ .collect(Collectors.toList());
+
+ return operator.apply(arguments);
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlStringUnaryExpression.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUnaryOperator.java
similarity index 58%
rename from
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlStringUnaryExpression.java
rename to
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUnaryOperator.java
index 13e2c10e27f..b1e0cf73929 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlStringUnaryExpression.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUnaryOperator.java
@@ -15,29 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+import static com.google.common.base.Preconditions.checkArgument;
import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import org.apache.calcite.sql.type.SqlTypeName;
-/** Base class for all string unary operators. */
-public abstract class BeamSqlStringUnaryExpression extends BeamSqlExpression {
- public BeamSqlStringUnaryExpression(List<BeamSqlExpression> operands,
SqlTypeName outputType) {
- super(operands, outputType);
+/** An operator that is applied to already-evaluated arguments. */
+public interface BeamSqlUnaryOperator extends BeamSqlOperator {
+
+ default BeamSqlPrimitive apply(List<BeamSqlPrimitive> arguments) {
+ checkArgument(arguments.size() == 1, "Unary operator %s received more than
one argument", this);
+ return apply(arguments.get(0));
}
- @Override
- public boolean accept() {
- if (operands.size() != 1) {
- return false;
- }
+ default boolean accept(List<BeamSqlExpression> arguments) {
+ return arguments.size() == 1 && accept(arguments.get(0));
+ }
- if (!SqlTypeName.CHAR_TYPES.contains(opType(0))) {
- return false;
- }
+ boolean accept(BeamSqlExpression argument);
- return true;
- }
+ BeamSqlPrimitive apply(BeamSqlPrimitive argument);
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/StringOperators.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/StringOperators.java
new file mode 100644
index 00000000000..632699d81be
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/StringOperators.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.List;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.fun.SqlTrimFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/** String operator implementations. */
+public class StringOperators {
+
+ /** A {@link BeamSqlOperator} that returns a string. */
+ public interface StringOperator extends BeamSqlOperator {
+ default SqlTypeName getOutputType() {
+ return SqlTypeName.VARCHAR;
+ }
+ }
+
+ @FunctionalInterface
+ private interface StringUnaryOperator extends BeamSqlUnaryOperator {
+ default boolean accept(BeamSqlExpression arg) {
+ return SqlTypeName.CHAR_TYPES.contains(arg.getOutputType());
+ }
+
+ default SqlTypeName getOutputType() {
+ return SqlTypeName.VARCHAR;
+ }
+ }
+
+ public static final BeamSqlOperator CHAR_LENGTH =
+ (StringUnaryOperator)
+ (BeamSqlPrimitive arg) ->
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER,
SqlFunctions.charLength(arg.getString()));
+
+ public static final BeamSqlOperator UPPER =
+ (StringUnaryOperator)
+ (BeamSqlPrimitive arg) ->
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
SqlFunctions.upper(arg.getString()));
+
+ public static final BeamSqlOperator LOWER =
+ (StringUnaryOperator)
+ (BeamSqlPrimitive arg) ->
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
SqlFunctions.lower(arg.getString()));
+
+ /** {@code INITCAP}. */
+ public static final BeamSqlOperator INIT_CAP =
+ (StringUnaryOperator)
+ (BeamSqlPrimitive arg) ->
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
SqlFunctions.initcap(arg.getString()));
+
+ public static final BeamSqlBinaryOperator CONCAT =
+ new BeamSqlBinaryOperator() {
+ @Override
+ public SqlTypeName getOutputType() {
+ return SqlTypeName.VARCHAR;
+ }
+
+ @Override
+ public boolean accept(BeamSqlExpression left, BeamSqlExpression right)
{
+ return SqlTypeName.CHAR_TYPES.contains(left.getOutputType())
+ && SqlTypeName.CHAR_TYPES.contains(right.getOutputType());
+ }
+
+ @Override
+ public BeamSqlPrimitive apply(BeamSqlPrimitive left, BeamSqlPrimitive
right) {
+ return BeamSqlPrimitive.of(
+ SqlTypeName.VARCHAR, SqlFunctions.concat(left.getString(),
right.getString()));
+ }
+ };
+
+ public static final BeamSqlOperator POSITION =
+ new StringOperator() {
+ @Override
+ public boolean accept(List<BeamSqlExpression> operands) {
+ if (operands.size() < 2 || operands.size() > 3) {
+ return false;
+ }
+
+ return
SqlTypeName.CHAR_TYPES.contains(operands.get(0).getOutputType())
+ &&
SqlTypeName.CHAR_TYPES.contains(operands.get(1).getOutputType())
+ && ((operands.size() < 3)
+ ||
SqlTypeName.INT_TYPES.contains(operands.get(2).getOutputType()));
+ }
+
+ @Override
+ public BeamSqlPrimitive apply(List<BeamSqlPrimitive> arguments) {
+ String targetStr = arguments.get(0).getString();
+ String containingStr = arguments.get(1).getString();
+ int from = arguments.size() < 3 ? 1 : arguments.get(2).getInteger();
+ return BeamSqlPrimitive.of(
+ SqlTypeName.INTEGER, SqlFunctions.position(targetStr,
containingStr, from));
+ }
+ };
+
+ public static final BeamSqlOperator TRIM =
+ new StringOperator() {
+ @Override
+ public boolean accept(List<BeamSqlExpression> subexpressions) {
+ return (subexpressions.size() == 1
+ &&
SqlTypeName.CHAR_TYPES.contains(subexpressions.get(0).getOutputType()))
+ || (subexpressions.size() == 3
+ &&
SqlTypeName.SYMBOL.equals(subexpressions.get(0).getOutputType())
+ &&
SqlTypeName.CHAR_TYPES.contains(subexpressions.get(1).getOutputType())
+ &&
SqlTypeName.CHAR_TYPES.contains(subexpressions.get(2).getOutputType()));
+ }
+
+ @Override
+ public BeamSqlPrimitive apply(List<BeamSqlPrimitive> operands) {
+ SqlTrimFunction.Flag type;
+ String removeChars;
+ String fromStr;
+
+ if (operands.size() == 1) {
+ type = SqlTrimFunction.Flag.BOTH;
+ removeChars = " ";
+ fromStr = operands.get(0).getString();
+ } else {
+ type = (SqlTrimFunction.Flag) operands.get(0).getValue();
+ removeChars = operands.get(1).getString();
+ fromStr = operands.get(2).getString();
+ }
+
+ return BeamSqlPrimitive.of(
+ SqlTypeName.VARCHAR,
+ trim(
+ type == SqlTrimFunction.Flag.BOTH || type ==
SqlTrimFunction.Flag.LEADING,
+ type == SqlTrimFunction.Flag.BOTH || type ==
SqlTrimFunction.Flag.TRAILING,
+ removeChars,
+ fromStr));
+ }
+ };
+
+ /**
+ * Calcite's implementation of TRIM is incorrect and only trims the first
character in removeStr.
+ *
+ * <p>This implementation deliberately kept compatible with eventual
upstreaming.
+ */
+ private static String trim(
+ boolean leading, boolean trailing, String removeChars, String fromStr) {
+ int j = fromStr.length();
+ if (trailing) {
+ for (; ; ) {
+ if (j == 0) {
+ return "";
+ }
+ if (removeChars.indexOf(fromStr.charAt(j - 1)) < 0) {
+ break;
+ }
+ --j;
+ }
+ }
+ int i = 0;
+ if (leading) {
+ for (; ; ) {
+ if (i == j) {
+ return "";
+ }
+ if (removeChars.indexOf(fromStr.charAt(i)) < 0) {
+ break;
+ }
+ ++i;
+ }
+ }
+ return fromStr.substring(i, j);
+ }
+
+ public static final BeamSqlOperator OVERLAY =
+ new StringOperator() {
+ @Override
+ public boolean accept(List<BeamSqlExpression> operands) {
+ return (operands.size() == 3
+ &&
SqlTypeName.CHAR_TYPES.contains(operands.get(0).getOutputType())
+ &&
SqlTypeName.CHAR_TYPES.contains(operands.get(1).getOutputType())
+ &&
SqlTypeName.INT_TYPES.contains(operands.get(2).getOutputType()))
+ || (operands.size() == 4
+ &&
SqlTypeName.CHAR_TYPES.contains(operands.get(0).getOutputType())
+ &&
SqlTypeName.CHAR_TYPES.contains(operands.get(1).getOutputType())
+ &&
SqlTypeName.INT_TYPES.contains(operands.get(2).getOutputType()))
+ &&
SqlTypeName.INT_TYPES.contains(operands.get(3).getOutputType());
+ }
+
+ @Override
+ public BeamSqlPrimitive apply(List<BeamSqlPrimitive> operands) {
+ String str = operands.get(0).getString();
+ String replaceStr = operands.get(1).getString();
+ int idx = operands.get(2).getInteger();
+ int length = operands.size() == 4 ? operands.get(3).getInteger() :
replaceStr.length();
+ return BeamSqlPrimitive.of(
+ SqlTypeName.VARCHAR, SqlFunctions.overlay(str, replaceStr, idx,
length));
+ }
+ };
+
+ public static final BeamSqlOperator SUBSTRING =
+ new StringOperator() {
+
+ @Override
+ public boolean accept(List<BeamSqlExpression> operands) {
+ return (operands.size() == 2
+ &&
SqlTypeName.CHAR_TYPES.contains(operands.get(0).getOutputType())
+ &&
SqlTypeName.INT_TYPES.contains(operands.get(1).getOutputType()))
+ || (operands.size() == 3
+ &&
SqlTypeName.CHAR_TYPES.contains(operands.get(0).getOutputType())
+ &&
SqlTypeName.INT_TYPES.contains(operands.get(1).getOutputType())
+ &&
SqlTypeName.INT_TYPES.contains(operands.get(2).getOutputType()));
+ }
+
+ @Override
+ public BeamSqlPrimitive apply(List<BeamSqlPrimitive> operands) {
+ String str = operands.get(0).getString();
+ int from = operands.get(1).getInteger();
+ if (from < 0) {
+ from = from + str.length() + 1;
+ }
+
+ if (operands.size() == 3) {
+ int length = operands.get(2).getInteger();
+
+ return BeamSqlPrimitive.of(
+ SqlTypeName.VARCHAR, SqlFunctions.substring(str, from,
length));
+ } else {
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
SqlFunctions.substring(str, from));
+ }
+ }
+ };
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
deleted file mode 100644
index 72899c2eb2b..00000000000
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/** String concat operator. */
-public class BeamSqlConcatExpression extends BeamSqlExpression {
-
- protected BeamSqlConcatExpression(List<BeamSqlExpression> operands,
SqlTypeName outputType) {
- super(operands, outputType);
- }
-
- public BeamSqlConcatExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.VARCHAR);
- }
-
- @Override
- public boolean accept() {
- if (operands.size() != 2) {
- return false;
- }
-
- for (BeamSqlExpression exp : getOperands()) {
- if (!SqlTypeName.CHAR_TYPES.contains(exp.getOutputType())) {
- return false;
- }
- }
-
- return true;
- }
-
- @Override
- public BeamSqlPrimitive evaluate(
- Row inputRow, BoundedWindow window, ImmutableMap<Integer, Object>
correlateEnv) {
- String left = opValueEvaluated(0, inputRow, window, correlateEnv);
- String right = opValueEvaluated(1, inputRow, window, correlateEnv);
-
- return BeamSqlPrimitive.of(
- SqlTypeName.VARCHAR,
- new StringBuilder(left.length() +
right.length()).append(left).append(right).toString());
- }
-}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
deleted file mode 100644
index 3927e005f73..00000000000
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/** 'INITCAP' operator. */
-public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression {
- public BeamSqlInitCapExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.VARCHAR);
- }
-
- @Override
- public BeamSqlPrimitive evaluate(
- Row inputRow, BoundedWindow window, ImmutableMap<Integer, Object>
correlateEnv) {
- String str = opValueEvaluated(0, inputRow, window, correlateEnv);
-
- StringBuilder ret = new StringBuilder(str);
- boolean isInit = true;
- for (int i = 0; i < str.length(); i++) {
- if (Character.isWhitespace(str.charAt(i))) {
- isInit = true;
- continue;
- }
-
- if (isInit) {
- ret.setCharAt(i, Character.toUpperCase(str.charAt(i)));
- isInit = false;
- } else {
- ret.setCharAt(i, Character.toLowerCase(str.charAt(i)));
- }
- }
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, ret.toString());
- }
-}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
deleted file mode 100644
index ab0b7bac820..00000000000
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/** 'LOWER' operator. */
-public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression {
- public BeamSqlLowerExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.VARCHAR);
- }
-
- @Override
- public BeamSqlPrimitive evaluate(
- Row inputRow, BoundedWindow window, ImmutableMap<Integer, Object>
correlateEnv) {
- String str = opValueEvaluated(0, inputRow, window, correlateEnv);
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase());
- }
-}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
deleted file mode 100644
index dac0775f783..00000000000
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'OVERLAY' operator.
- *
- * <p>OVERLAY(string1 PLACING string2 FROM integer [ FOR integer2 ])
- */
-public class BeamSqlOverlayExpression extends BeamSqlExpression {
- public BeamSqlOverlayExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.VARCHAR);
- }
-
- @Override
- public boolean accept() {
- if (operands.size() < 3 || operands.size() > 4) {
- return false;
- }
-
- if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
- || !SqlTypeName.CHAR_TYPES.contains(opType(1))
- || !SqlTypeName.INT_TYPES.contains(opType(2))) {
- return false;
- }
-
- if (operands.size() == 4 && !SqlTypeName.INT_TYPES.contains(opType(3))) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public BeamSqlPrimitive evaluate(
- Row inputRow, BoundedWindow window, ImmutableMap<Integer, Object>
correlateEnv) {
- String str = opValueEvaluated(0, inputRow, window, correlateEnv);
- String replaceStr = opValueEvaluated(1, inputRow, window, correlateEnv);
- int idx = opValueEvaluated(2, inputRow, window, correlateEnv);
- // the index is 1 based.
- idx -= 1;
- int length = replaceStr.length();
- if (operands.size() == 4) {
- length = opValueEvaluated(3, inputRow, window, correlateEnv);
- }
-
- StringBuilder result = new StringBuilder(str.length() +
replaceStr.length() - length);
- result.append(str.substring(0,
idx)).append(replaceStr).append(str.substring(idx + length));
-
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, result.toString());
- }
-}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
deleted file mode 100644
index dc8db85a945..00000000000
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * String position operator.
- *
- * <p>example: POSITION(string1 IN string2) POSITION(string1 IN string2 FROM
integer)
- */
-public class BeamSqlPositionExpression extends BeamSqlExpression {
- public BeamSqlPositionExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.INTEGER);
- }
-
- @Override
- public boolean accept() {
- if (operands.size() < 2 || operands.size() > 3) {
- return false;
- }
-
- if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
- || !SqlTypeName.CHAR_TYPES.contains(opType(1))) {
- return false;
- }
-
- if (operands.size() == 3 && !SqlTypeName.INT_TYPES.contains(opType(2))) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public BeamSqlPrimitive evaluate(
- Row inputRow, BoundedWindow window, ImmutableMap<Integer, Object>
correlateEnv) {
- String targetStr = opValueEvaluated(0, inputRow, window, correlateEnv);
- String containingStr = opValueEvaluated(1, inputRow, window, correlateEnv);
- int from = -1;
- if (operands.size() == 3) {
- Number tmp = opValueEvaluated(2, inputRow, window, correlateEnv);
- from = tmp.intValue();
- }
-
- int idx = containingStr.indexOf(targetStr, from);
-
- return BeamSqlPrimitive.of(SqlTypeName.INTEGER, idx);
- }
-}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
deleted file mode 100644
index d8ec59bdcd4..00000000000
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'SUBSTRING' operator.
- *
- * <p>SUBSTRING(string FROM integer) SUBSTRING(string FROM integer FOR integer)
- */
-public class BeamSqlSubstringExpression extends BeamSqlExpression {
- public BeamSqlSubstringExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.VARCHAR);
- }
-
- @Override
- public boolean accept() {
- if (operands.size() < 2 || operands.size() > 3) {
- return false;
- }
-
- if (!SqlTypeName.CHAR_TYPES.contains(opType(0)) ||
!SqlTypeName.INT_TYPES.contains(opType(1))) {
- return false;
- }
-
- if (operands.size() == 3 && !SqlTypeName.INT_TYPES.contains(opType(2))) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public BeamSqlPrimitive evaluate(
- Row inputRow, BoundedWindow window, ImmutableMap<Integer, Object>
correlateEnv) {
- String str = opValueEvaluated(0, inputRow, window, correlateEnv);
- int idx = opValueEvaluated(1, inputRow, window, correlateEnv);
- int startIdx = idx;
- if (startIdx > 0) {
- // NOTE: SQL substring is 1 based(rather than 0 based)
- startIdx -= 1;
- } else if (startIdx < 0) {
- // NOTE: SQL also support negative index...
- startIdx += str.length();
- } else {
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "");
- }
-
- if (operands.size() == 3) {
- int length = opValueEvaluated(2, inputRow, window, correlateEnv);
- if (length < 0) {
- length = 0;
- }
- int endIdx = Math.min(startIdx + length, str.length());
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx,
endIdx));
- } else {
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx));
- }
- }
-}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
deleted file mode 100644
index a913e437965..00000000000
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.fun.SqlTrimFunction;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Trim operator.
- *
- * <p>TRIM( { BOTH | LEADING | TRAILING } string1 FROM string2)
- */
-public class BeamSqlTrimExpression extends BeamSqlExpression {
- public BeamSqlTrimExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.VARCHAR);
- }
-
- @Override
- public boolean accept() {
- if (operands.size() != 1 && operands.size() != 3) {
- return false;
- }
-
- if (operands.size() == 1 && !SqlTypeName.CHAR_TYPES.contains(opType(0))) {
- return false;
- }
-
- if (operands.size() == 3
- && (SqlTypeName.SYMBOL != opType(0)
- || !SqlTypeName.CHAR_TYPES.contains(opType(1))
- || !SqlTypeName.CHAR_TYPES.contains(opType(2)))) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public BeamSqlPrimitive evaluate(
- Row inputRow, BoundedWindow window, ImmutableMap<Integer, Object>
correlateEnv) {
- if (operands.size() == 1) {
- return BeamSqlPrimitive.of(
- SqlTypeName.VARCHAR,
- opValueEvaluated(0, inputRow, window,
correlateEnv).toString().trim());
- } else {
- SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow, window,
correlateEnv);
- String targetStr = opValueEvaluated(1, inputRow, window, correlateEnv);
- String containingStr = opValueEvaluated(2, inputRow, window,
correlateEnv);
-
- switch (type) {
- case LEADING:
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
leadingTrim(containingStr, targetStr));
- case TRAILING:
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
trailingTrim(containingStr, targetStr));
- case BOTH:
- default:
- return BeamSqlPrimitive.of(
- SqlTypeName.VARCHAR, trailingTrim(leadingTrim(containingStr,
targetStr), targetStr));
- }
- }
- }
-
- static String leadingTrim(String containingStr, String targetStr) {
- int idx = 0;
- while (containingStr.startsWith(targetStr, idx)) {
- idx += targetStr.length();
- }
-
- return containingStr.substring(idx);
- }
-
- static String trailingTrim(String containingStr, String targetStr) {
- int idx = containingStr.length() - targetStr.length();
- while (containingStr.startsWith(targetStr, idx)) {
- idx -= targetStr.length();
- }
-
- idx += targetStr.length();
- return containingStr.substring(0, idx);
- }
-}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
index 1ae19b1fdf6..33dcc8abff2 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
@@ -24,7 +24,6 @@
import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
@@ -47,15 +46,6 @@
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlCharLengthExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlConcatExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlInitCapExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlLowerExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlOverlayExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlPositionExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlSubstringExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlTrimExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlUpperExpression;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
@@ -66,7 +56,6 @@
import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.fun.SqlTrimFunction;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.Assert;
@@ -209,120 +198,6 @@ private void testBuildArithmeticExpression(
assertTrue(exp.getClass().equals(clazz));
}
- @Test
- public void testBuildExpression_string() {
- RexNode rexNode;
- BeamSqlExpression exp;
- rexNode =
- rexBuilder.makeCall(
- SqlStdOperatorTable.CONCAT,
- Arrays.asList(rexBuilder.makeLiteral("hello "),
rexBuilder.makeLiteral("world")));
- exp = BeamSqlFnExecutor.buildExpression(rexNode);
- assertTrue(exp instanceof BeamSqlConcatExpression);
-
- rexNode =
- rexBuilder.makeCall(
- SqlStdOperatorTable.POSITION,
- Arrays.asList(rexBuilder.makeLiteral("hello"),
rexBuilder.makeLiteral("worldhello")));
- exp = BeamSqlFnExecutor.buildExpression(rexNode);
- assertTrue(exp instanceof BeamSqlPositionExpression);
-
- rexNode =
- rexBuilder.makeCall(
- SqlStdOperatorTable.POSITION,
- Arrays.asList(
- rexBuilder.makeLiteral("hello"),
- rexBuilder.makeLiteral("worldhello"),
- rexBuilder.makeCast(
- TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER),
- rexBuilder.makeBigintLiteral(BigDecimal.ONE))));
- exp = BeamSqlFnExecutor.buildExpression(rexNode);
- assertTrue(exp instanceof BeamSqlPositionExpression);
-
- rexNode =
- rexBuilder.makeCall(
- SqlStdOperatorTable.CHAR_LENGTH,
Arrays.asList(rexBuilder.makeLiteral("hello")));
- exp = BeamSqlFnExecutor.buildExpression(rexNode);
- assertTrue(exp instanceof BeamSqlCharLengthExpression);
-
- rexNode =
- rexBuilder.makeCall(
- SqlStdOperatorTable.UPPER,
Arrays.asList(rexBuilder.makeLiteral("hello")));
- exp = BeamSqlFnExecutor.buildExpression(rexNode);
- assertTrue(exp instanceof BeamSqlUpperExpression);
-
- rexNode =
- rexBuilder.makeCall(
- SqlStdOperatorTable.LOWER,
Arrays.asList(rexBuilder.makeLiteral("HELLO")));
- exp = BeamSqlFnExecutor.buildExpression(rexNode);
- assertTrue(exp instanceof BeamSqlLowerExpression);
-
- rexNode =
- rexBuilder.makeCall(
- SqlStdOperatorTable.INITCAP,
Arrays.asList(rexBuilder.makeLiteral("hello")));
- exp = BeamSqlFnExecutor.buildExpression(rexNode);
- assertTrue(exp instanceof BeamSqlInitCapExpression);
-
- rexNode =
- rexBuilder.makeCall(
- SqlStdOperatorTable.TRIM,
- Arrays.asList(
- rexBuilder.makeFlag(SqlTrimFunction.Flag.BOTH),
- rexBuilder.makeLiteral("HELLO"),
- rexBuilder.makeLiteral("HELLO")));
- exp = BeamSqlFnExecutor.buildExpression(rexNode);
- assertTrue(exp instanceof BeamSqlTrimExpression);
-
- rexNode =
- rexBuilder.makeCall(
- SqlStdOperatorTable.SUBSTRING,
- Arrays.asList(
- rexBuilder.makeLiteral("HELLO"),
rexBuilder.makeBigintLiteral(BigDecimal.ZERO)));
- exp = BeamSqlFnExecutor.buildExpression(rexNode);
- assertTrue(exp instanceof BeamSqlSubstringExpression);
-
- rexNode =
- rexBuilder.makeCall(
- SqlStdOperatorTable.SUBSTRING,
- Arrays.asList(
- rexBuilder.makeLiteral("HELLO"),
- rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
- rexBuilder.makeBigintLiteral(BigDecimal.ZERO)));
- exp = BeamSqlFnExecutor.buildExpression(rexNode);
- assertTrue(exp instanceof BeamSqlSubstringExpression);
-
- rexNode =
- rexBuilder.makeCall(
- SqlStdOperatorTable.OVERLAY,
- Arrays.asList(
- rexBuilder.makeLiteral("HELLO"),
- rexBuilder.makeLiteral("HELLO"),
- rexBuilder.makeBigintLiteral(BigDecimal.ZERO)));
- exp = BeamSqlFnExecutor.buildExpression(rexNode);
- assertTrue(exp instanceof BeamSqlOverlayExpression);
-
- rexNode =
- rexBuilder.makeCall(
- SqlStdOperatorTable.OVERLAY,
- Arrays.asList(
- rexBuilder.makeLiteral("HELLO"),
- rexBuilder.makeLiteral("HELLO"),
- rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
- rexBuilder.makeBigintLiteral(BigDecimal.ZERO)));
- exp = BeamSqlFnExecutor.buildExpression(rexNode);
- assertTrue(exp instanceof BeamSqlOverlayExpression);
-
- rexNode =
- rexBuilder.makeCall(
- SqlStdOperatorTable.CASE,
- Arrays.asList(
- rexBuilder.makeLiteral(true),
- rexBuilder.makeLiteral("HELLO"),
- rexBuilder.makeLiteral("HELLO")));
- exp = BeamSqlFnExecutor.buildExpression(rexNode);
- assertTrue(exp instanceof BeamSqlCaseExpression);
- }
-
@Test
public void testBuildExpression_date() {
RexNode rexNode;
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/StringOperatorsTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/StringOperatorsTest.java
new file mode 100644
index 00000000000..6e6c61e9b9c
--- /dev/null
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/StringOperatorsTest.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableList;
+import
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.fun.SqlTrimFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test of BeamSqlUpperExpression. */
+@RunWith(Enclosed.class)
+public class StringOperatorsTest extends BeamSqlFnExecutorTestBase {
+
+ /** Tests for UPPER. */
+ @RunWith(JUnit4.class)
+ public static class UpperTest {
+ @Test
+ public void testUpper() {
+ assertThat(
+ StringOperators.UPPER
+ .apply(ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
"hello")))
+ .getValue(),
+ equalTo("HELLO"));
+ }
+ }
+
+ /** Tests for LOWER. */
+ @RunWith(JUnit4.class)
+ public static class LowerTest {
+ @Test
+ public void testLower() {
+ assertThat(
+ StringOperators.LOWER
+ .apply(ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
"HELLo")))
+ .getValue(),
+ equalTo("hello"));
+ }
+ }
+
+ /** Tests for TRIM. */
+ @RunWith(JUnit4.class)
+ public static class TrimTest {
+
+ @Test
+ public void testAcceptOne() {
+ assertTrue(
+ StringOperators.TRIM.accept(
+ ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "
hello "))));
+ }
+
+ @Test
+ public void testAcceptThree() {
+ assertTrue(
+ StringOperators.TRIM.accept(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.SYMBOL,
SqlTrimFunction.Flag.BOTH),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"))));
+ }
+
+ @Test
+ public void testRejectTwo() {
+ assertFalse(
+ StringOperators.TRIM.accept(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"))));
+ }
+
+ @Test
+ public void testLeading() throws Exception {
+ assertThat(
+ StringOperators.TRIM
+ .apply(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.SYMBOL,
SqlTrimFunction.Flag.LEADING),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "eh"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe")))
+ .getValue(),
+ equalTo("__hehe"));
+ }
+
+ @Test
+ public void testTrailing() {
+ assertThat(
+ StringOperators.TRIM
+ .apply(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.SYMBOL,
SqlTrimFunction.Flag.TRAILING),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "eh"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe")))
+ .getValue(),
+ equalTo("hehe__"));
+ }
+
+ @Test
+ public void testBoth() {
+ assertThat(
+ StringOperators.TRIM
+ .apply(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.SYMBOL,
SqlTrimFunction.Flag.BOTH),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "eh"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe")))
+ .getValue(),
+ equalTo("__"));
+ }
+
+ @Test
+ public void testDefault() {
+ assertThat(
+ StringOperators.TRIM
+ .apply(ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
" hello ")))
+ .getValue(),
+ equalTo("hello"));
+ }
+ }
+
+ /** Tests for CHAR_LENGTH. */
+ @RunWith(JUnit4.class)
+ public static class CharLengthTest {
+
+ @Test
+ public void testSimple() {
+ assertThat(
+ StringOperators.CHAR_LENGTH
+ .apply(ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
"hello")))
+ .getValue(),
+ equalTo(5));
+ }
+
+ @Test
+ public void testAccept() {
+ assertTrue(
+ StringOperators.CHAR_LENGTH.accept(
+ ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
"hello"))));
+ }
+
+ @Test
+ public void testRejectNonString() {
+ assertFalse(
+ StringOperators.CHAR_LENGTH.accept(
+ ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1))));
+ }
+
+ @Test
+ public void testRejectTooManyArgs() {
+ assertFalse(
+ StringOperators.CHAR_LENGTH.accept(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"))));
+ }
+ }
+
+ /** Tests for Concat operator. */
+ @RunWith(JUnit4.class)
+ public static class BeamSqlConcatExpressionTest extends
BeamSqlFnExecutorTestBase {
+
+ @Test
+ public void accept() throws Exception {
+ assertTrue(
+ StringOperators.CONCAT.accept(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"))));
+ }
+
+ @Test
+ public void rejectNonString() throws Exception {
+ assertFalse(
+ StringOperators.CONCAT.accept(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"))));
+ }
+
+ @Test
+ public void rejectTooMany() throws Exception {
+ assertFalse(
+ StringOperators.CONCAT.accept(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"))));
+ }
+
+ @Test
+ public void testApply() throws Exception {
+ assertThat(
+ StringOperators.CONCAT
+ .apply(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " world")))
+ .getValue(),
+ equalTo("hello world"));
+ }
+ }
+
+ /** Test for SUBSTRING. */
+ public static class SubstringTest {
+
+ @Test
+ public void testAcceptTwoArgs() throws Exception {
+ assertTrue(
+ StringOperators.SUBSTRING.accept(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1))));
+ }
+
+ @Test
+ public void testAcceptThreeArgs() {
+ assertTrue(
+ StringOperators.SUBSTRING.accept(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2))));
+ }
+
+ @Test
+ public void testApplyWhole() {
+ assertThat(
+ StringOperators.SUBSTRING
+ .apply(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)))
+ .getValue(),
+ equalTo("hello"));
+ }
+
+ @Test
+ public void testApplySubstring() {
+ assertThat(
+ StringOperators.SUBSTRING
+ .apply(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2)))
+ .getValue(),
+ equalTo("he"));
+ }
+
+ @Test
+ public void testApplyExactLength() {
+ assertThat(
+ StringOperators.SUBSTRING
+ .apply(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5)))
+ .getValue(),
+ equalTo("hello"));
+ }
+
+ @Test
+ public void testApplyExceedsLength() {
+ assertThat(
+ StringOperators.SUBSTRING
+ .apply(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100)))
+ .getValue(),
+ equalTo("hello"));
+ }
+
+ @Test
+ public void testApplyNegativeLength() {
+ assertThat(
+ StringOperators.SUBSTRING
+ .apply(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0)))
+ .getValue(),
+ equalTo(""));
+ }
+
+ @Test
+ public void testApplyNegativeStartpoint() {
+ assertThat(
+ StringOperators.SUBSTRING
+ .apply(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1)))
+ .getValue(),
+ equalTo("o"));
+ }
+ }
+
+ /** Test for POSITION. */
+ public static class PositionTest {
+ @Test
+ public void testAcceptTwoArgs() throws Exception {
+ assertTrue(
+ StringOperators.POSITION.accept(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"))));
+ }
+
+ @Test
+ public void testAcceptTrheeArgs() throws Exception {
+ assertTrue(
+ StringOperators.POSITION.accept(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1))));
+ }
+
+ @Test
+ public void testReject() {
+ assertFalse(
+ StringOperators.POSITION.accept(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"))));
+ }
+
+ @Test
+ public void testRejectTwoMany() {
+ assertFalse(
+ StringOperators.POSITION.accept(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1))));
+ }
+
+ @Test
+ public void testBasic() {
+ assertThat(
+ StringOperators.POSITION
+ .apply(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello")))
+ .getValue(),
+ equalTo(6));
+ }
+
+ @Test
+ public void testThreeArgs() {
+ assertThat(
+ StringOperators.POSITION
+ .apply(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)))
+ .getValue(),
+ equalTo(6));
+ }
+
+ @Test
+ public void testThreeArgsNotFound() {
+ assertThat(
+ StringOperators.POSITION
+ .apply(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)))
+ .getValue(),
+ equalTo(0));
+ }
+ }
+
+ /** Test for BeamSqlOverlayExpression. */
+ public static class OverlayTest {
+
+ @Test
+ public void acceptThreeArgs() {
+ assertTrue(
+ StringOperators.OVERLAY.accept(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1))));
+ }
+
+ @Test
+ public void acceptFourArgs() {
+ assertTrue(
+ StringOperators.OVERLAY.accept(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2))));
+ }
+
+ @Test
+ public void testOverlayBasic() {
+ assertThat(
+ StringOperators.OVERLAY
+ .apply(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3)))
+ .getValue(),
+ equalTo("w3resou3rce"));
+ }
+
+ @Test
+ public void testOverlayFourArgs() {
+ assertThat(
+ StringOperators.OVERLAY
+ .apply(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4)))
+ .getValue(),
+ equalTo("w3resou33rce"));
+ }
+
+ @Test
+ public void testOverlayFourArgs2() {
+ assertThat(
+ StringOperators.OVERLAY
+ .apply(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5)))
+ .getValue(),
+ equalTo("w3resou3rce"));
+ }
+
+ @Test
+ public void testOverlayBigGap() {
+ assertThat(
+ StringOperators.OVERLAY
+ .apply(
+ ImmutableList.of(
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3),
+ BeamSqlPrimitive.of(SqlTypeName.INTEGER, 7)))
+ .getValue(),
+ equalTo("w3resouce"));
+ }
+ }
+
+ /** Test of BeamSqlInitCapExpression. */
+ @RunWith(JUnit4.class)
+ public static class InitCapTest {
+
+ @Test
+ public void testTwoWords() {
+ assertThat(
+ StringOperators.INIT_CAP
+ .apply(ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
"hello world")))
+ .getValue(),
+ equalTo("Hello World"));
+ }
+
+ @Test
+ public void testTwoWordsWonky() {
+ assertThat(
+ StringOperators.INIT_CAP
+ .apply(ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
"hEllO wOrld")))
+ .getValue(),
+ equalTo("Hello World"));
+ }
+
+ @Test
+ public void testTwoWordsSpacedOut() {
+ assertThat(
+ StringOperators.INIT_CAP
+ .apply(ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
"hello world")))
+ .getValue(),
+ equalTo("Hello World"));
+ }
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
deleted file mode 100644
index 3327f3ed9b8..00000000000
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.ArrayList;
-import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/** Test for BeamSqlCharLengthExpression. */
-public class BeamSqlCharLengthExpressionTest extends BeamSqlFnExecutorTestBase
{
-
- @Test
- public void evaluate() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
-
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- assertEquals(
- 5,
- new BeamSqlCharLengthExpression(operands)
- .evaluate(row, null, ImmutableMap.of())
- .getValue());
- }
-}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpressionTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpressionTest.java
deleted file mode 100644
index e6e1176d485..00000000000
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpressionTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.ArrayList;
-import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Assert;
-import org.junit.Test;
-
-/** Test for BeamSqlConcatExpression. */
-public class BeamSqlConcatExpressionTest extends BeamSqlFnExecutorTestBase {
-
- @Test
- public void accept() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
-
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
- assertTrue(new BeamSqlConcatExpression(operands).accept());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
- assertFalse(new BeamSqlConcatExpression(operands).accept());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- assertFalse(new BeamSqlConcatExpression(operands).accept());
- }
-
- @Test
- public void evaluate() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
-
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " world"));
- Assert.assertEquals(
- "hello world",
- new BeamSqlConcatExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
- }
-}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
deleted file mode 100644
index 5682b07bf8b..00000000000
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.ArrayList;
-import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/** Test of BeamSqlInitCapExpression. */
-public class BeamSqlInitCapExpressionTest extends BeamSqlFnExecutorTestBase {
-
- @Test
- public void evaluate() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
-
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello world"));
- assertEquals(
- "Hello World",
- new BeamSqlInitCapExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hEllO wOrld"));
- assertEquals(
- "Hello World",
- new BeamSqlInitCapExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello world"));
- assertEquals(
- "Hello World",
- new BeamSqlInitCapExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
- }
-}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpressionTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpressionTest.java
deleted file mode 100644
index eae0dc54460..00000000000
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpressionTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.ArrayList;
-import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/** Test of BeamSqlLowerExpression. */
-public class BeamSqlLowerExpressionTest extends BeamSqlFnExecutorTestBase {
-
- @Test
- public void evaluate() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
-
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "HELLO"));
- assertEquals(
- "hello",
- new BeamSqlLowerExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
- }
-}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
deleted file mode 100644
index 140527ff512..00000000000
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.ArrayList;
-import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Assert;
-import org.junit.Test;
-
-/** Test for BeamSqlOverlayExpression. */
-public class BeamSqlOverlayExpressionTest extends BeamSqlFnExecutorTestBase {
-
- @Test
- public void accept() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
-
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- assertTrue(new BeamSqlOverlayExpression(operands).accept());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
- assertTrue(new BeamSqlOverlayExpression(operands).accept());
- }
-
- @Test
- public void evaluate() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
-
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
- Assert.assertEquals(
- "w3resou3rce",
- new BeamSqlOverlayExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4));
- Assert.assertEquals(
- "w3resou33rce",
- new BeamSqlOverlayExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
- Assert.assertEquals(
- "w3resou3rce",
- new BeamSqlOverlayExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 7));
- Assert.assertEquals(
- "w3resouce",
- new BeamSqlOverlayExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
- }
-}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpressionTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpressionTest.java
deleted file mode 100644
index ef89890b9a9..00000000000
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpressionTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.ArrayList;
-import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/** Test for BeamSqlPositionExpression. */
-public class BeamSqlPositionExpressionTest extends BeamSqlFnExecutorTestBase {
-
- @Test
- public void accept() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
-
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
- assertTrue(new BeamSqlPositionExpression(operands).accept());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- assertTrue(new BeamSqlPositionExpression(operands).accept());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
- assertFalse(new BeamSqlPositionExpression(operands).accept());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- assertFalse(new BeamSqlPositionExpression(operands).accept());
- }
-
- @Test
- public void evaluate() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
-
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
- assertEquals(
- 5,
- new BeamSqlPositionExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- assertEquals(
- 5,
- new BeamSqlPositionExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- assertEquals(
- -1,
- new BeamSqlPositionExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
- }
-}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
deleted file mode 100644
index 5ccca97e222..00000000000
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/** Test for BeamSqlStringUnaryExpression. */
-public class BeamSqlStringUnaryExpressionTest {
-
- @Test
- public void accept() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
-
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- assertTrue(new BeamSqlCharLengthExpression(operands).accept());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- assertFalse(new BeamSqlCharLengthExpression(operands).accept());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- assertFalse(new BeamSqlCharLengthExpression(operands).accept());
- }
-}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
deleted file mode 100644
index 0f8a71ff151..00000000000
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.ArrayList;
-import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/** Test for BeamSqlSubstringExpression. */
-public class BeamSqlSubstringExpressionTest extends BeamSqlFnExecutorTestBase {
-
- @Test
- public void accept() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
-
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- assertTrue(new BeamSqlSubstringExpression(operands).accept());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
- assertTrue(new BeamSqlSubstringExpression(operands).accept());
- }
-
- @Test
- public void evaluate() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
-
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- assertEquals(
- "hello",
- new BeamSqlSubstringExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
- assertEquals(
- "he",
- new BeamSqlSubstringExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
- assertEquals(
- "hello",
- new BeamSqlSubstringExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100));
- assertEquals(
- "hello",
- new BeamSqlSubstringExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0));
- assertEquals(
- "",
- new BeamSqlSubstringExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1));
- assertEquals(
- "",
- new BeamSqlSubstringExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1));
- assertEquals(
- "o",
- new BeamSqlSubstringExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
- }
-}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpressionTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpressionTest.java
deleted file mode 100644
index eefc222cafc..00000000000
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpressionTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.ArrayList;
-import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.fun.SqlTrimFunction;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/** Test for BeamSqlTrimExpression. */
-public class BeamSqlTrimExpressionTest extends BeamSqlFnExecutorTestBase {
-
- @Test
- public void accept() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
-
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " hello "));
- assertTrue(new BeamSqlTrimExpression(operands).accept());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL,
SqlTrimFunction.Flag.BOTH));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
- assertTrue(new BeamSqlTrimExpression(operands).accept());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
- assertFalse(new BeamSqlTrimExpression(operands).accept());
- }
-
- @Test
- public void evaluate() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
-
- operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL,
SqlTrimFunction.Flag.LEADING));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
- assertEquals(
- "__hehe",
- new BeamSqlTrimExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL,
SqlTrimFunction.Flag.TRAILING));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
- assertEquals(
- "hehe__",
- new BeamSqlTrimExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL,
SqlTrimFunction.Flag.BOTH));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "__"));
- assertEquals(
- "__",
- new BeamSqlTrimExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
-
- operands.clear();
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " hello "));
- assertEquals(
- "hello",
- new BeamSqlTrimExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
- }
-
- @Test
- public void leadingTrim() throws Exception {
- assertEquals("__hehe", BeamSqlTrimExpression.leadingTrim("hehe__hehe",
"he"));
- }
-
- @Test
- public void trailingTrim() throws Exception {
- assertEquals("hehe__", BeamSqlTrimExpression.trailingTrim("hehe__hehe",
"he"));
- }
-
- @Test
- public void trim() throws Exception {
- assertEquals(
- "__",
- BeamSqlTrimExpression.leadingTrim(
- BeamSqlTrimExpression.trailingTrim("hehe__hehe", "he"), "he"));
- }
-}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpressionTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpressionTest.java
deleted file mode 100644
index ce55b35db3d..00000000000
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpressionTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.ArrayList;
-import java.util.List;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/** Test of BeamSqlUpperExpression. */
-public class BeamSqlUpperExpressionTest extends BeamSqlFnExecutorTestBase {
-
- @Test
- public void evaluate() throws Exception {
- List<BeamSqlExpression> operands = new ArrayList<>();
-
- operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
- assertEquals(
- "HELLO",
- new BeamSqlUpperExpression(operands).evaluate(row, null,
ImmutableMap.of()).getValue());
- }
-}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
index 3e9c4068843..4d4c1793637 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java
@@ -31,8 +31,8 @@ public void testStringFunctions() throws Exception {
.addExpr("CHARACTER_LENGTH('hello')", 5)
.addExpr("UPPER('hello')", "HELLO")
.addExpr("LOWER('HELLO')", "hello")
- .addExpr("POSITION('world' IN 'helloworld')", 5)
- .addExpr("POSITION('world' IN 'helloworldworld' FROM 7)", 10)
+ .addExpr("POSITION('world' IN 'helloworld')", 6)
+ .addExpr("POSITION('world' IN 'helloworldworld' FROM 7)", 11)
.addExpr("TRIM(' hello ')", "hello")
.addExpr("TRIM(LEADING ' ' FROM ' hello ')", "hello ")
.addExpr("TRIM(TRAILING ' ' FROM ' hello ')", " hello")
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 110452)
Time Spent: 3h 20m (was: 3h 10m)
> SQL operator argument evaluation should have one place where it is managed
> --------------------------------------------------------------------------
>
> Key: BEAM-4365
> URL: https://issues.apache.org/jira/browse/BEAM-4365
> Project: Beam
> Issue Type: Bug
> Components: dsl-sql
> Reporter: Kenneth Knowles
> Assignee: Kenneth Knowles
> Priority: Major
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
> The way Beam SQL is factored, each operator has to explicitly ask its
> argument to be evaluated. This should be handled generically at a higher
> level. Since the language is pure and terminating, it is fine for them to
> vary, but given the simplicity of the expression language it makes sense to
> use simple call-by-value.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)