[ 
https://issues.apache.org/jira/browse/BEAM-4365?focusedWorklogId=104149&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104149
 ]

ASF GitHub Bot logged work on BEAM-4365:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/May/18 17:42
            Start Date: 21/May/18 17:42
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #5433: [BEAM-4365] Make 
BeamSqlExpression for operators, use it for string operators
URL: https://github.com/apache/beam/pull/5433
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 9de107b7d16..8b0a225f6cb 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -27,11 +27,13 @@
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlDotExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlOperatorExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlUdfExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowEndExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowStartExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.StringOperators;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlDivideExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMinusExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlModExpression;
@@ -88,15 +90,6 @@
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTruncateExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret.BeamSqlReinterpretExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.row.BeamSqlFieldAccessExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlCharLengthExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlConcatExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlInitCapExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlLowerExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlOverlayExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlPositionExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlSubstringExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlTrimExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlUpperExpression;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
@@ -343,32 +336,32 @@ static BeamSqlExpression buildExpression(RexNode rexNode) 
{
 
         // string operators
         case "||":
-          ret = new BeamSqlConcatExpression(subExps);
+          ret = new BeamSqlOperatorExpression(StringOperators.CONCAT, subExps);
           break;
         case "POSITION":
-          ret = new BeamSqlPositionExpression(subExps);
+          ret = new BeamSqlOperatorExpression(StringOperators.POSITION, 
subExps);
           break;
         case "CHAR_LENGTH":
         case "CHARACTER_LENGTH":
-          ret = new BeamSqlCharLengthExpression(subExps);
+          ret = new BeamSqlOperatorExpression(StringOperators.CHAR_LENGTH, 
subExps);
           break;
         case "UPPER":
-          ret = new BeamSqlUpperExpression(subExps);
+          ret = new BeamSqlOperatorExpression(StringOperators.UPPER, subExps);
           break;
         case "LOWER":
-          ret = new BeamSqlLowerExpression(subExps);
+          ret = new BeamSqlOperatorExpression(StringOperators.LOWER, subExps);
           break;
         case "TRIM":
-          ret = new BeamSqlTrimExpression(subExps);
+          ret = new BeamSqlOperatorExpression(StringOperators.TRIM, subExps);
           break;
         case "SUBSTRING":
-          ret = new BeamSqlSubstringExpression(subExps);
+          ret = new BeamSqlOperatorExpression(StringOperators.SUBSTRING, 
subExps);
           break;
         case "OVERLAY":
-          ret = new BeamSqlOverlayExpression(subExps);
+          ret = new BeamSqlOperatorExpression(StringOperators.OVERLAY, 
subExps);
           break;
         case "INITCAP":
-          ret = new BeamSqlInitCapExpression(subExps);
+          ret = new BeamSqlOperatorExpression(StringOperators.INIT_CAP, 
subExps);
           break;
 
         // date functions
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlBinaryOperator.java
similarity index 54%
rename from 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
rename to 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlBinaryOperator.java
index 3c2d139db80..f3458e063f9 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlBinaryOperator.java
@@ -15,26 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+import static com.google.common.base.Preconditions.checkArgument;
 
 import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
 
-/**
- * 'LOWER' operator.
- */
-public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression {
-  public BeamSqlLowerExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.VARCHAR);
+/** An operator that is applied to already-evaluated arguments */
+public interface BeamSqlBinaryOperator extends BeamSqlOperator {
+  default BeamSqlPrimitive apply(List<BeamSqlPrimitive> arguments) {
+    checkArgument(arguments.size() == 2, "Unary operator %s received more than 
one argument", this);
+    return apply(arguments.get(0), arguments.get(1));
   }
 
-  @Override public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow 
window) {
-    String str = opValueEvaluated(0, inputRow, window);
-    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase());
+  default boolean accept(List<BeamSqlExpression> arguments) {
+    return arguments.size() == 2 &&
+        accept(arguments.get(0), arguments.get(1));
   }
+
+  boolean accept(BeamSqlExpression left, BeamSqlExpression right);
+  BeamSqlPrimitive apply(BeamSqlPrimitive left, BeamSqlPrimitive right);
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/package-info.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlOperator.java
similarity index 67%
rename from 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/package-info.java
rename to 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlOperator.java
index f8fc4beb446..13c60cd5026 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/package-info.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlOperator.java
@@ -15,8 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
- * String operators.
+ * An operator that is applied to already-evaluated arguments
  */
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+public interface BeamSqlOperator {
+  boolean accept(List<BeamSqlExpression> arguments);
+  SqlTypeName getOutputType();
+  BeamSqlPrimitive apply(List<BeamSqlPrimitive> arguments);
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlOperatorExpression.java
similarity index 58%
rename from 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
rename to 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlOperatorExpression.java
index b38ff9661c9..bb5c6e42572 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlOperatorExpression.java
@@ -15,26 +15,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 
 import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
- * 'UPPER' operator.
+ * DEFAULT keyword for UDF with optional parameter.
  */
-public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression {
-  public BeamSqlUpperExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.VARCHAR);
+public class BeamSqlOperatorExpression extends BeamSqlExpression {
+
+  private final BeamSqlOperator operator;
+
+  public BeamSqlOperatorExpression(
+      BeamSqlOperator operator, List<BeamSqlExpression> operands) {
+    super(operands, operator.getOutputType());
+    this.operator = operator;
   }
 
-  @Override public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow 
window) {
-    String str = opValueEvaluated(0, inputRow, window);
-    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase());
+  @Override
+  public boolean accept() {
+    return operator.accept(operands);
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow window) {
+    List<BeamSqlPrimitive> arguments =
+        operands.stream()
+            .map(operand -> operand.evaluate(inputRow, window))
+            .collect(Collectors.toList());
+
+    return operator.apply(arguments);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlStringUnaryExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUnaryOperator.java
similarity index 58%
rename from 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlStringUnaryExpression.java
rename to 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUnaryOperator.java
index 1e1b553886e..c1e8ab8dbb6 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlStringUnaryExpression.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUnaryOperator.java
@@ -15,30 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+import static com.google.common.base.Preconditions.checkArgument;
 
 import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import org.apache.calcite.sql.type.SqlTypeName;
 
-/**
- * Base class for all string unary operators.
- */
-public abstract class BeamSqlStringUnaryExpression extends BeamSqlExpression {
-  public BeamSqlStringUnaryExpression(List<BeamSqlExpression> operands, 
SqlTypeName outputType) {
-    super(operands, outputType);
+/** An operator that is applied to already-evaluated arguments */
+public interface BeamSqlUnaryOperator extends BeamSqlOperator {
+
+  default BeamSqlPrimitive apply(List<BeamSqlPrimitive> arguments) {
+    checkArgument(arguments.size() == 1, "Unary operator %s received more than 
one argument", this);
+    return apply(arguments.get(0));
   }
 
-  @Override public boolean accept() {
-    if (operands.size() != 1) {
-      return false;
-    }
+  default boolean accept(List<BeamSqlExpression> arguments) {
+    return arguments.size() == 1 && accept(arguments.get(0));
+  }
 
-    if (!SqlTypeName.CHAR_TYPES.contains(opType(0))) {
-      return false;
-    }
+  boolean accept(BeamSqlExpression argument);
 
-    return true;
-  }
+  BeamSqlPrimitive apply(BeamSqlPrimitive argument);
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/StringOperators.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/StringOperators.java
new file mode 100644
index 00000000000..a622ae10caa
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/StringOperators.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.List;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlOperator;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlUnaryOperator;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.sql.fun.SqlTrimFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/** String operator implementations */
+public class StringOperators {
+
+  public interface StringOperator extends BeamSqlOperator {
+    default SqlTypeName getOutputType() {
+      return SqlTypeName.VARCHAR;
+    }
+  }
+
+  @FunctionalInterface
+  private interface StringUnaryOperator extends BeamSqlUnaryOperator {
+    default boolean accept(BeamSqlExpression arg) {
+      return SqlTypeName.CHAR_TYPES.contains(arg.getOutputType());
+    }
+
+    default SqlTypeName getOutputType() {
+      return SqlTypeName.VARCHAR;
+    }
+  }
+
+  public static final BeamSqlOperator CHAR_LENGTH =
+      (StringUnaryOperator)
+          (BeamSqlPrimitive arg) ->
+              BeamSqlPrimitive.of(SqlTypeName.INTEGER, 
arg.getString().length());
+
+  public static final BeamSqlOperator UPPER =
+      (StringUnaryOperator)
+          (BeamSqlPrimitive arg) ->
+              BeamSqlPrimitive.of(SqlTypeName.VARCHAR, 
arg.getString().toUpperCase());
+
+  public static final BeamSqlOperator LOWER =
+      (StringUnaryOperator)
+          (BeamSqlPrimitive arg) ->
+              BeamSqlPrimitive.of(SqlTypeName.VARCHAR, 
arg.getString().toLowerCase());
+
+  /** {@code INITCAP}. */
+  public static final BeamSqlOperator INIT_CAP =
+      (StringUnaryOperator)
+          (BeamSqlPrimitive arg) -> {
+            String str = arg.getString();
+
+            StringBuilder ret = new StringBuilder(str);
+            boolean isInit = true;
+            for (int i = 0; i < str.length(); i++) {
+              if (Character.isWhitespace(str.charAt(i))) {
+                isInit = true;
+                continue;
+              }
+
+              if (isInit) {
+                ret.setCharAt(i, Character.toUpperCase(str.charAt(i)));
+                isInit = false;
+              } else {
+                ret.setCharAt(i, Character.toLowerCase(str.charAt(i)));
+              }
+            }
+            return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, ret.toString());
+          };
+
+  public static final BeamSqlBinaryOperator CONCAT =
+      new BeamSqlBinaryOperator() {
+        @Override
+        public SqlTypeName getOutputType() {
+          return SqlTypeName.VARCHAR;
+        }
+
+        @Override
+        public boolean accept(BeamSqlExpression left, BeamSqlExpression right) 
{
+          return SqlTypeName.CHAR_TYPES.contains(left.getOutputType())
+              && SqlTypeName.CHAR_TYPES.contains(right.getOutputType());
+        }
+
+        @Override
+        public BeamSqlPrimitive apply(BeamSqlPrimitive left, BeamSqlPrimitive 
right) {
+          String leftString = left.getString();
+          String rightString = right.getString();
+
+          return BeamSqlPrimitive.of(
+              SqlTypeName.VARCHAR,
+              new StringBuilder(leftString.length() + rightString.length())
+                  .append(leftString)
+                  .append(rightString)
+                  .toString());
+        }
+      };
+
+  public static final BeamSqlOperator POSITION =
+      new StringOperator() {
+        @Override
+        public boolean accept(List<BeamSqlExpression> operands) {
+          if (operands.size() < 2 || operands.size() > 3) {
+            return false;
+          }
+
+          return 
SqlTypeName.CHAR_TYPES.contains(operands.get(0).getOutputType())
+              && 
SqlTypeName.CHAR_TYPES.contains(operands.get(1).getOutputType())
+              && ((operands.size() < 3)
+              || 
SqlTypeName.INT_TYPES.contains(operands.get(2).getOutputType()));
+        }
+
+        @Override
+        public BeamSqlPrimitive apply(List<BeamSqlPrimitive> arguments) {
+          String targetStr = arguments.get(0).getString();
+          String containingStr = arguments.get(1).getString();
+          int from = arguments.size() < 3 ? -1 : ((Number) 
arguments.get(2).getValue()).intValue();
+
+          int idx = containingStr.indexOf(targetStr, from);
+
+          return BeamSqlPrimitive.of(SqlTypeName.INTEGER, idx);
+        }
+      };
+
+  public static final BeamSqlOperator TRIM =
+      new StringOperator() {
+        @Override
+        public boolean accept(List<BeamSqlExpression> subexpressions) {
+          return (subexpressions.size() == 1
+                  && 
SqlTypeName.CHAR_TYPES.contains(subexpressions.get(0).getOutputType()))
+              || (subexpressions.size() == 3
+                  && 
SqlTypeName.SYMBOL.equals(subexpressions.get(0).getOutputType())
+                  && 
SqlTypeName.CHAR_TYPES.contains(subexpressions.get(1).getOutputType())
+                  && 
SqlTypeName.CHAR_TYPES.contains(subexpressions.get(2).getOutputType()));
+        }
+
+        @Override
+        public BeamSqlPrimitive apply(List<BeamSqlPrimitive> operands) {
+          if (operands.size() == 1) {
+            return BeamSqlPrimitive.of(
+                SqlTypeName.VARCHAR, 
operands.get(0).getValue().toString().trim());
+          } else {
+            SqlTrimFunction.Flag type = (SqlTrimFunction.Flag) 
operands.get(0).getValue();
+            String targetStr = operands.get(1).getString();
+            String containingStr = operands.get(2).getString();
+
+            switch (type) {
+              case LEADING:
+                return BeamSqlPrimitive.of(
+                    SqlTypeName.VARCHAR, leadingTrim(containingStr, 
targetStr));
+              case TRAILING:
+                return BeamSqlPrimitive.of(
+                    SqlTypeName.VARCHAR, trailingTrim(containingStr, 
targetStr));
+              case BOTH:
+              default:
+                return BeamSqlPrimitive.of(
+                    SqlTypeName.VARCHAR,
+                    trailingTrim(leadingTrim(containingStr, targetStr), 
targetStr));
+            }
+          }
+        }
+
+        String leadingTrim(String containingStr, String targetStr) {
+          int idx = 0;
+          while (containingStr.startsWith(targetStr, idx)) {
+            idx += targetStr.length();
+          }
+
+          return containingStr.substring(idx);
+        }
+
+        String trailingTrim(String containingStr, String targetStr) {
+          int idx = containingStr.length() - targetStr.length();
+          while (containingStr.startsWith(targetStr, idx)) {
+            idx -= targetStr.length();
+          }
+
+          idx += targetStr.length();
+          return containingStr.substring(0, idx);
+        }
+      };
+
+  public static final BeamSqlOperator OVERLAY =
+      new StringOperator() {
+        @Override
+        public boolean accept(List<BeamSqlExpression> operands) {
+          return (operands.size() == 3
+              && 
SqlTypeName.CHAR_TYPES.contains(operands.get(0).getOutputType())
+              && 
SqlTypeName.CHAR_TYPES.contains(operands.get(1).getOutputType())
+              && 
SqlTypeName.INT_TYPES.contains(operands.get(2).getOutputType()))
+              || (operands.size() == 4
+              && 
SqlTypeName.CHAR_TYPES.contains(operands.get(0).getOutputType())
+              && 
SqlTypeName.CHAR_TYPES.contains(operands.get(1).getOutputType())
+              && 
SqlTypeName.INT_TYPES.contains(operands.get(2).getOutputType()))
+              && 
SqlTypeName.INT_TYPES.contains(operands.get(3).getOutputType());
+        }
+
+        @Override
+        public BeamSqlPrimitive apply(List<BeamSqlPrimitive> operands) {
+          String str = operands.get(0).getString();
+          String replaceStr = operands.get(1).getString();
+          int idx = operands.get(2).getInteger();
+
+          // the index is 1 based.
+          idx -= 1;
+          int length = replaceStr.length();
+          if (operands.size() == 4) {
+            length = operands.get(3).getInteger();
+          }
+
+          StringBuilder result = new StringBuilder(str.length() + 
replaceStr.length() - length);
+          result
+              .append(str.substring(0, idx))
+              .append(replaceStr)
+              .append(str.substring(idx + length));
+
+          return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, result.toString());
+        }
+      };
+
+  public static final BeamSqlOperator SUBSTRING =
+      new StringOperator() {
+
+        @Override
+        public boolean accept(List<BeamSqlExpression> operands) {
+          return (operands.size() == 2
+              && 
SqlTypeName.CHAR_TYPES.contains(operands.get(0).getOutputType())
+              && 
SqlTypeName.INT_TYPES.contains(operands.get(1).getOutputType()))
+              || (operands.size() == 3
+                  && 
SqlTypeName.CHAR_TYPES.contains(operands.get(0).getOutputType())
+                  && 
SqlTypeName.INT_TYPES.contains(operands.get(1).getOutputType())
+                  && 
SqlTypeName.INT_TYPES.contains(operands.get(2).getOutputType()));
+        }
+
+        @Override
+        public BeamSqlPrimitive apply(List<BeamSqlPrimitive> operands) {
+          String str = operands.get(0).getString();
+          int idx = operands.get(1).getInteger();
+          int startIdx = idx;
+          if (startIdx > 0) {
+            // NOTE: SQL substring is 1 based(rather than 0 based)
+            startIdx -= 1;
+          } else if (startIdx < 0) {
+            // NOTE: SQL also support negative index...
+            startIdx += str.length();
+          } else {
+            return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "");
+          }
+
+          if (operands.size() == 3) {
+            int length = operands.get(2).getInteger();
+            if (length < 0) {
+              length = 0;
+            }
+            int endIdx = Math.min(startIdx + length, str.length());
+            return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, 
str.substring(startIdx, endIdx));
+          } else {
+            return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, 
str.substring(startIdx));
+          }
+        }
+      };
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
deleted file mode 100644
index 91828239e33..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'CHAR_LENGTH' operator.
- */
-public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression {
-  public BeamSqlCharLengthExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.INTEGER);
-  }
-
-  @Override public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow 
window) {
-    String str = opValueEvaluated(0, inputRow, window);
-    return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length());
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
deleted file mode 100644
index ffa7ad2409e..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * String concat operator.
- */
-public class BeamSqlConcatExpression extends BeamSqlExpression {
-
-  protected BeamSqlConcatExpression(List<BeamSqlExpression> operands, 
SqlTypeName outputType) {
-    super(operands, outputType);
-  }
-
-  public BeamSqlConcatExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.VARCHAR);
-  }
-
-  @Override public boolean accept() {
-    if (operands.size() != 2) {
-      return false;
-    }
-
-    for (BeamSqlExpression exp : getOperands()) {
-      if (!SqlTypeName.CHAR_TYPES.contains(exp.getOutputType())) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  @Override public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow 
window) {
-    String left = opValueEvaluated(0, inputRow, window);
-    String right = opValueEvaluated(1, inputRow, window);
-
-    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
-        new StringBuilder(left.length() + right.length())
-            .append(left).append(right).toString());
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
deleted file mode 100644
index 2468dec99cb..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'INITCAP' operator.
- */
-public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression {
-  public BeamSqlInitCapExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.VARCHAR);
-  }
-
-  @Override public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow 
window) {
-    String str = opValueEvaluated(0, inputRow, window);
-
-    StringBuilder ret = new StringBuilder(str);
-    boolean isInit = true;
-    for (int i = 0; i < str.length(); i++) {
-      if (Character.isWhitespace(str.charAt(i))) {
-        isInit = true;
-        continue;
-      }
-
-      if (isInit) {
-        ret.setCharAt(i, Character.toUpperCase(str.charAt(i)));
-        isInit = false;
-      } else {
-        ret.setCharAt(i, Character.toLowerCase(str.charAt(i)));
-      }
-    }
-    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, ret.toString());
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
deleted file mode 100644
index 63128f973e6..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'OVERLAY' operator.
- *
- * <p>
- *   OVERLAY(string1 PLACING string2 FROM integer [ FOR integer2 ])
- * </p>
- */
-public class BeamSqlOverlayExpression extends BeamSqlExpression {
-  public BeamSqlOverlayExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.VARCHAR);
-  }
-
-  @Override public boolean accept() {
-    if (operands.size() < 3 || operands.size() > 4) {
-      return false;
-    }
-
-    if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
-        || !SqlTypeName.CHAR_TYPES.contains(opType(1))
-        || !SqlTypeName.INT_TYPES.contains(opType(2))) {
-      return false;
-    }
-
-    if (operands.size() == 4 && !SqlTypeName.INT_TYPES.contains(opType(3))) {
-      return false;
-    }
-
-    return true;
-  }
-
-  @Override public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow 
window) {
-    String str = opValueEvaluated(0, inputRow, window);
-    String replaceStr = opValueEvaluated(1, inputRow, window);
-    int idx = opValueEvaluated(2, inputRow, window);
-    // the index is 1 based.
-    idx -= 1;
-    int length = replaceStr.length();
-    if (operands.size() == 4) {
-      length = opValueEvaluated(3, inputRow, window);
-    }
-
-    StringBuilder result = new StringBuilder(
-        str.length() + replaceStr.length() - length);
-    result.append(str.substring(0, idx))
-        .append(replaceStr)
-        .append(str.substring(idx + length));
-
-    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, result.toString());
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
deleted file mode 100644
index 2b0452c033e..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * String position operator.
- *
- * <p>
- *   example:
- *     POSITION(string1 IN string2)
- *     POSITION(string1 IN string2 FROM integer)
- * </p>
- */
-public class BeamSqlPositionExpression extends BeamSqlExpression {
-  public BeamSqlPositionExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.INTEGER);
-  }
-
-  @Override public boolean accept() {
-    if (operands.size() < 2 || operands.size() > 3) {
-      return false;
-    }
-
-    if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
-        || !SqlTypeName.CHAR_TYPES.contains(opType(1))) {
-      return false;
-    }
-
-    if (operands.size() == 3
-        && !SqlTypeName.INT_TYPES.contains(opType(2))) {
-      return false;
-    }
-
-    return true;
-  }
-
-  @Override public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow 
window) {
-    String targetStr = opValueEvaluated(0, inputRow, window);
-    String containingStr = opValueEvaluated(1, inputRow, window);
-    int from = -1;
-    if (operands.size() == 3) {
-      Number tmp = opValueEvaluated(2, inputRow, window);
-      from = tmp.intValue();
-    }
-
-    int idx = containingStr.indexOf(targetStr, from);
-
-    return BeamSqlPrimitive.of(SqlTypeName.INTEGER, idx);
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
deleted file mode 100644
index b2194b2ad2d..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'SUBSTRING' operator.
- *
- * <p>
- *   SUBSTRING(string FROM integer)
- *   SUBSTRING(string FROM integer FOR integer)
- * </p>
- */
-public class BeamSqlSubstringExpression extends BeamSqlExpression {
-  public BeamSqlSubstringExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.VARCHAR);
-  }
-
-  @Override public boolean accept() {
-    if (operands.size() < 2 || operands.size() > 3) {
-      return false;
-    }
-
-    if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
-        || !SqlTypeName.INT_TYPES.contains(opType(1))) {
-      return false;
-    }
-
-    if (operands.size() == 3 && !SqlTypeName.INT_TYPES.contains(opType(2))) {
-      return false;
-    }
-
-    return true;
-  }
-
-  @Override public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow 
window) {
-    String str = opValueEvaluated(0, inputRow, window);
-    int idx = opValueEvaluated(1, inputRow, window);
-    int startIdx = idx;
-    if (startIdx > 0) {
-      // NOTE: SQL substring is 1 based(rather than 0 based)
-      startIdx -= 1;
-    } else if (startIdx < 0) {
-      // NOTE: SQL also support negative index...
-      startIdx += str.length();
-    } else {
-      return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "");
-    }
-
-    if (operands.size() == 3) {
-      int length = opValueEvaluated(2, inputRow, window);
-      if (length < 0) {
-        length = 0;
-      }
-      int endIdx = Math.min(startIdx + length, str.length());
-      return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx, 
endIdx));
-    } else {
-      return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx));
-    }
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
deleted file mode 100644
index 2244dd4a7f9..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.fun.SqlTrimFunction;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Trim operator.
- *
- * <p>
- * TRIM( { BOTH | LEADING | TRAILING } string1 FROM string2)
- * </p>
- */
-public class BeamSqlTrimExpression extends BeamSqlExpression {
-  public BeamSqlTrimExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.VARCHAR);
-  }
-
-  @Override public boolean accept() {
-    if (operands.size() != 1 && operands.size() != 3) {
-      return false;
-    }
-
-    if (operands.size() == 1 && !SqlTypeName.CHAR_TYPES.contains(opType(0))) {
-      return false;
-    }
-
-    if (operands.size() == 3
-        && (
-        SqlTypeName.SYMBOL != opType(0)
-            || !SqlTypeName.CHAR_TYPES.contains(opType(1))
-            || !SqlTypeName.CHAR_TYPES.contains(opType(2)))
-        ) {
-      return false;
-    }
-
-    return true;
-  }
-
-  @Override public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow 
window) {
-    if (operands.size() == 1) {
-      return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
-          opValueEvaluated(0, inputRow, window).toString().trim());
-    } else {
-      SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow, window);
-      String targetStr = opValueEvaluated(1, inputRow, window);
-      String containingStr = opValueEvaluated(2, inputRow, window);
-
-      switch (type) {
-        case LEADING:
-          return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, 
leadingTrim(containingStr, targetStr));
-        case TRAILING:
-          return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, 
trailingTrim(containingStr, targetStr));
-        case BOTH:
-        default:
-          return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
-              trailingTrim(leadingTrim(containingStr, targetStr), targetStr));
-      }
-    }
-  }
-
-  static String leadingTrim(String containingStr, String targetStr) {
-    int idx = 0;
-    while (containingStr.startsWith(targetStr, idx)) {
-      idx += targetStr.length();
-    }
-
-    return containingStr.substring(idx);
-  }
-
-  static String trailingTrim(String containingStr, String targetStr) {
-    int idx = containingStr.length() - targetStr.length();
-    while (containingStr.startsWith(targetStr, idx)) {
-      idx -= targetStr.length();
-    }
-
-    idx += targetStr.length();
-    return containingStr.substring(0, idx);
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
index 477ed866009..d0220761016 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
@@ -47,15 +47,6 @@
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlCharLengthExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlConcatExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlInitCapExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlLowerExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlOverlayExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlPositionExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlSubstringExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlTrimExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlUpperExpression;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
@@ -210,133 +201,6 @@ private void testBuildArithmeticExpression(SqlOperator fn,
     assertTrue(exp.getClass().equals(clazz));
   }
 
-  @Test
-  public void testBuildExpression_string()  {
-    RexNode rexNode;
-    BeamSqlExpression exp;
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CONCAT,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello "),
-            rexBuilder.makeLiteral("world")
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlConcatExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello"),
-            rexBuilder.makeLiteral("worldhello")
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlPositionExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello"),
-            rexBuilder.makeLiteral("worldhello"),
-            
rexBuilder.makeCast(TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER),
-                rexBuilder.makeBigintLiteral(BigDecimal.ONE))
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlPositionExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CHAR_LENGTH,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello")
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlCharLengthExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.UPPER,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello")
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlUpperExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOWER,
-        Arrays.asList(
-            rexBuilder.makeLiteral("HELLO")
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlLowerExpression);
-
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.INITCAP,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello")
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlInitCapExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.TRIM,
-        Arrays.asList(
-            rexBuilder.makeFlag(SqlTrimFunction.Flag.BOTH),
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeLiteral("HELLO")
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlTrimExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
-        Arrays.asList(
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlSubstringExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
-        Arrays.asList(
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlSubstringExpression);
-
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
-        Arrays.asList(
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlOverlayExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
-        Arrays.asList(
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlOverlayExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CASE,
-        Arrays.asList(
-            rexBuilder.makeLiteral(true),
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeLiteral("HELLO")
-        )
-    );
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlCaseExpression);
-  }
 
   @Test
   public void testBuildExpression_date() {
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/StringOperatorsTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/StringOperatorsTest.java
new file mode 100644
index 00000000000..fa75d1bbc4a
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/StringOperatorsTest.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableList;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.fun.SqlTrimFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test of BeamSqlUpperExpression.
+ */
+@RunWith(Enclosed.class)
+public class StringOperatorsTest extends BeamSqlFnExecutorTestBase {
+
+  @RunWith(JUnit4.class)
+  public static class UpperTest {
+    @Test
+    public void testUpper() {
+      assertThat(
+          StringOperators.UPPER
+              .apply(ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, 
"hello")))
+              .getValue(),
+          equalTo("HELLO"));
+    }
+  }
+
+  @RunWith(JUnit4.class)
+  public static class LowerTest {
+    @Test
+    public void testLower() {
+      assertThat(
+          StringOperators.LOWER
+              .apply(ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, 
"HELLo")))
+              .getValue(),
+          equalTo("hello"));
+    }
+  }
+
+  @RunWith(JUnit4.class)
+  public static class TrimTest {
+
+    @Test
+    public void testAcceptOne() {
+      assertTrue(
+          StringOperators.TRIM.accept(
+              ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " 
hello "))));
+    }
+
+    @Test
+    public void testAcceptThree() {
+      assertTrue(
+          StringOperators.TRIM.accept(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.SYMBOL, 
SqlTrimFunction.Flag.BOTH),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"))));
+    }
+
+    @Test
+    public void testRejectTwo() {
+      assertFalse(
+          StringOperators.TRIM.accept(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"))));
+    }
+
+    @Test
+    public void testLeading() throws Exception {
+      assertThat(
+          StringOperators.TRIM.apply(
+              ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, 
SqlTrimFunction.Flag.LEADING),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, 
"hehe__hehe"))).getValue(),
+          equalTo("__hehe"));
+    }
+
+    @Test
+    public void testTrailing() {
+      assertThat(
+          StringOperators.TRIM.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.SYMBOL, 
SqlTrimFunction.Flag.TRAILING),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, 
"hehe__hehe"))).getValue(),
+          equalTo("hehe__"));
+    }
+
+
+    @Test
+    public void testBoth() {
+      assertThat(
+          StringOperators.TRIM.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.SYMBOL, 
SqlTrimFunction.Flag.BOTH),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "__"))).getValue(),
+          equalTo("__"));
+    }
+
+
+    @Test
+    public void testDefault() {
+      assertThat(
+          StringOperators.TRIM.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " hello 
"))).getValue(),
+          equalTo("hello"));
+    }
+  }
+
+  @RunWith(JUnit4.class)
+  public static class CharLengthTest {
+
+    @Test
+    public void testSimple() {
+      assertThat(
+          StringOperators.CHAR_LENGTH
+              .apply(ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, 
"hello")))
+              .getValue(),
+          equalTo(5));
+    }
+
+    @Test
+    public void testAccept() {
+      assertTrue(
+          StringOperators.CHAR_LENGTH.accept(
+              ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, 
"hello"))));
+    }
+
+    @Test
+    public void testRejectNonString() {
+      assertFalse(
+          StringOperators.CHAR_LENGTH.accept(
+              ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1))));
+    }
+
+    @Test
+    public void testRejectTooManyArgs() {
+      assertFalse(
+          StringOperators.CHAR_LENGTH.accept(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"))));
+    }
+  }
+
+  /**
+   * Tests for Concat operator
+   */
+  @RunWith(JUnit4.class)
+  public static class BeamSqlConcatExpressionTest extends 
BeamSqlFnExecutorTestBase {
+
+    @Test
+    public void accept() throws Exception {
+      assertTrue(StringOperators.CONCAT.accept(
+          ImmutableList.of(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+              BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"))));
+    }
+
+    @Test
+    public void rejectNonString() throws Exception {
+      assertFalse(StringOperators.CONCAT.accept(
+          ImmutableList.of(
+              BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+              BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"))));
+    }
+
+    @Test
+    public void rejectTooMany() throws Exception {
+      assertFalse(StringOperators.CONCAT.accept(
+          ImmutableList.of(
+              BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+              BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"),
+              BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"))));
+    }
+
+    @Test
+    public void testApply() throws Exception {
+      assertThat(
+          StringOperators.CONCAT.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " 
world"))).getValue(),
+          equalTo("hello world"));
+    }
+
+  }
+
+  /**
+   * Test for SUBSTRING.
+   */
+  public static class SubstringTest {
+
+    @Test
+    public void testAcceptTwoArgs() throws Exception {
+      assertTrue(StringOperators.SUBSTRING.accept(
+          ImmutableList.of(
+              BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+              BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1))));
+    }
+
+    @Test
+    public void testAcceptThreeArgs() {
+      assertTrue(StringOperators.SUBSTRING.accept(
+          ImmutableList.of(
+              BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+              BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+              BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2))));
+    }
+
+    @Test
+    public void testApplyWhole() {
+      assertThat(
+          StringOperators.SUBSTRING.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1))).getValue(),
+          equalTo("hello"));
+    }
+
+    @Test
+    public void testApplySubstring() {
+      assertThat(
+          StringOperators.SUBSTRING.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2))).getValue(),
+          equalTo("he"));
+    }
+
+    @Test
+    public void testApplyExactLength() {
+      assertThat(
+          StringOperators.SUBSTRING.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5))).getValue(),
+          equalTo("hello"));
+    }
+
+    @Test
+    public void testApplyExceedsLength() {
+      assertThat(
+          StringOperators.SUBSTRING.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100))).getValue(),
+          equalTo("hello"));
+    }
+
+    @Test
+    public void testApplyNegativeLength() {
+      assertThat(
+          StringOperators.SUBSTRING.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0))).getValue(),
+          equalTo(""));
+    }
+
+    @Test
+    public void testApplyNegativeEndpoint() {
+      assertThat(
+          StringOperators.SUBSTRING.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1))).getValue(),
+          equalTo(""));
+    }
+
+    @Test
+    public void testApplyNegativeStartpoint() {
+      assertThat(
+          StringOperators.SUBSTRING.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1))).getValue(),
+          equalTo("o"));
+    }
+  }
+
+  /**
+   * Test for POSITION.
+   */
+  public static class PositionTest {
+    @Test
+    public void testAcceptTwoArgs() throws Exception {
+      assertTrue(
+          StringOperators.POSITION.accept(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"))));
+    }
+
+    @Test
+    public void testAcceptTrheeArgs() throws Exception {
+      assertTrue(
+          StringOperators.POSITION.accept(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1))));
+    }
+
+    @Test
+    public void testReject() {
+      assertFalse(
+          StringOperators.POSITION.accept(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"))));
+    }
+
+    @Test
+    public void testRejectTwoMany() {
+      assertFalse(
+          StringOperators.POSITION.accept(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1))));
+    }
+
+    @Test
+    public void testBasic() {
+      assertThat(
+          StringOperators.POSITION.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, 
"worldhello"))).getValue(),
+          equalTo(5));
+    }
+
+    @Test
+    public void testThreeArgs() {
+      assertThat(
+          StringOperators.POSITION.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1))).getValue(),
+          equalTo(5));
+    }
+
+    @Test
+    public void testThreeArgsNotFound() {
+      assertThat(
+          StringOperators.POSITION.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1))).getValue(),
+          equalTo(-1));
+    }
+  }
+
+  /**
+   * Test for BeamSqlOverlayExpression.
+   */
+  public static class OverlayTest {
+
+    @Test
+    public void acceptThreeArgs() {
+      assertTrue(StringOperators.OVERLAY.accept(
+          ImmutableList.of(
+              BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+              BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+              BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1))));
+    }
+
+    @Test
+    public void acceptFourArgs() {
+      assertTrue(StringOperators.OVERLAY.accept(
+          ImmutableList.of(
+              BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+              BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"),
+              BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1),
+              BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2))));
+    }
+
+    @Test
+    public void testOverlayBasic() {
+      assertThat(
+          StringOperators.OVERLAY.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3))).getValue(),
+          equalTo("w3resou3rce"));
+    }
+
+    @Test
+    public void testOverlayFourArgs() {
+      assertThat(
+          StringOperators.OVERLAY.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4))).getValue(),
+          equalTo("w3resou33rce"));
+    }
+
+    @Test
+    public void testOverlayFourArgs2() {
+      assertThat(
+          StringOperators.OVERLAY.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5))).getValue(),
+          equalTo("w3resou3rce"));
+    }
+
+    @Test
+    public void testOverlayBigGap() {
+      assertThat(
+          StringOperators.OVERLAY.apply(
+              ImmutableList.of(
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"),
+                  BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3),
+                  BeamSqlPrimitive.of(SqlTypeName.INTEGER, 7))).getValue(),
+          equalTo("w3resouce"));
+    }
+  }
+
+  /**
+   * Test of BeamSqlInitCapExpression.
+   */
+  @RunWith(JUnit4.class)
+  public static class InitCapTest {
+
+    @Test
+    public void testTwoWords() {
+      assertThat(StringOperators.INIT_CAP.apply(
+          ImmutableList.of(
+              BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello 
world"))).getValue(),
+          equalTo("Hello World"));
+    }
+
+    @Test
+    public void testTwoWordsWonky() {
+      assertThat(StringOperators.INIT_CAP.apply(
+          ImmutableList.of(
+              BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hEllO 
wOrld"))).getValue(),
+          equalTo("Hello World"));
+    }
+
+    @Test
+    public void testTwoWordsSpacedOut() {
+      assertThat(StringOperators.INIT_CAP.apply(
+          ImmutableList.of(
+              BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello     
world"))).getValue(),
+          equalTo("Hello     World"));
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
deleted file mode 100644
index 876fbfee3f4..00000000000
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/**
- * Test for BeamSqlCharLengthExpression.
- */
-public class BeamSqlCharLengthExpressionTest extends BeamSqlFnExecutorTestBase 
{
-
-  @Test public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    assertEquals(5,
-        new BeamSqlCharLengthExpression(operands).evaluate(row, 
null).getValue());
-  }
-
-}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpressionTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpressionTest.java
deleted file mode 100644
index 085ecd7419d..00000000000
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpressionTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for BeamSqlConcatExpression.
- */
-public class BeamSqlConcatExpressionTest extends BeamSqlFnExecutorTestBase {
-
-  @Test public void accept() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
-    assertTrue(new BeamSqlConcatExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
-    assertFalse(new BeamSqlConcatExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    assertFalse(new BeamSqlConcatExpression(operands).accept());
-  }
-
-  @Test public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " world"));
-    Assert.assertEquals("hello world",
-        new BeamSqlConcatExpression(operands).evaluate(row, null).getValue());
-  }
-
-}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
deleted file mode 100644
index c17680bf286..00000000000
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/**
- * Test of BeamSqlInitCapExpression.
- */
-public class BeamSqlInitCapExpressionTest extends BeamSqlFnExecutorTestBase {
-
-  @Test public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello world"));
-    assertEquals("Hello World",
-        new BeamSqlInitCapExpression(operands).evaluate(row, null).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hEllO wOrld"));
-    assertEquals("Hello World",
-        new BeamSqlInitCapExpression(operands).evaluate(row, null).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello     world"));
-    assertEquals("Hello     World",
-        new BeamSqlInitCapExpression(operands).evaluate(row, null).getValue());
-  }
-
-}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpressionTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpressionTest.java
deleted file mode 100644
index 9807b977232..00000000000
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpressionTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/**
- * Test of BeamSqlLowerExpression.
- */
-public class BeamSqlLowerExpressionTest extends BeamSqlFnExecutorTestBase {
-
-  @Test public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "HELLO"));
-    assertEquals("hello",
-        new BeamSqlLowerExpression(operands).evaluate(row, null).getValue());
-  }
-
-}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
deleted file mode 100644
index 3e4443d4e2f..00000000000
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for BeamSqlOverlayExpression.
- */
-public class BeamSqlOverlayExpressionTest extends BeamSqlFnExecutorTestBase {
-
-  @Test public void accept() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertTrue(new BeamSqlOverlayExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
-    assertTrue(new BeamSqlOverlayExpression(operands).accept());
-  }
-
-  @Test public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
-    Assert.assertEquals("w3resou3rce",
-        new BeamSqlOverlayExpression(operands).evaluate(row, null).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4));
-    Assert.assertEquals("w3resou33rce",
-        new BeamSqlOverlayExpression(operands).evaluate(row, null).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
-    Assert.assertEquals("w3resou3rce",
-        new BeamSqlOverlayExpression(operands).evaluate(row, null).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 7));
-    Assert.assertEquals("w3resouce",
-        new BeamSqlOverlayExpression(operands).evaluate(row, null).getValue());
-  }
-
-}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpressionTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpressionTest.java
deleted file mode 100644
index efe650db34f..00000000000
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpressionTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/**
- * Test for BeamSqlPositionExpression.
- */
-public class BeamSqlPositionExpressionTest extends BeamSqlFnExecutorTestBase {
-
-  @Test public void accept() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
-    assertTrue(new BeamSqlPositionExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertTrue(new BeamSqlPositionExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
-    assertFalse(new BeamSqlPositionExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertFalse(new BeamSqlPositionExpression(operands).accept());
-  }
-
-  @Test public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
-    assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(row, 
null).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(row, 
null).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(-1, new BeamSqlPositionExpression(operands).evaluate(row, 
null).getValue());
-  }
-
-}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
deleted file mode 100644
index b999ca15e79..00000000000
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/**
- * Test for BeamSqlStringUnaryExpression.
- */
-public class BeamSqlStringUnaryExpressionTest {
-
-  @Test public void accept() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    assertTrue(new BeamSqlCharLengthExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertFalse(new BeamSqlCharLengthExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    assertFalse(new BeamSqlCharLengthExpression(operands).accept());
-  }
-
-}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
deleted file mode 100644
index 4cb06e665ec..00000000000
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/**
- * Test for BeamSqlSubstringExpression.
- */
-public class BeamSqlSubstringExpressionTest extends BeamSqlFnExecutorTestBase {
-
-  @Test public void accept() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertTrue(new BeamSqlSubstringExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
-    assertTrue(new BeamSqlSubstringExpression(operands).accept());
-  }
-
-  @Test public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals("hello",
-        new BeamSqlSubstringExpression(operands).evaluate(row, 
null).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
-    assertEquals("he",
-        new BeamSqlSubstringExpression(operands).evaluate(row, 
null).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
-    assertEquals("hello",
-        new BeamSqlSubstringExpression(operands).evaluate(row, 
null).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100));
-    assertEquals("hello",
-        new BeamSqlSubstringExpression(operands).evaluate(row, 
null).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0));
-    assertEquals("",
-        new BeamSqlSubstringExpression(operands).evaluate(row, 
null).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1));
-    assertEquals("",
-        new BeamSqlSubstringExpression(operands).evaluate(row, 
null).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1));
-    assertEquals("o",
-        new BeamSqlSubstringExpression(operands).evaluate(row, 
null).getValue());
-  }
-
-}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpressionTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpressionTest.java
deleted file mode 100644
index 8db93525ac1..00000000000
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpressionTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.fun.SqlTrimFunction;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for BeamSqlTrimExpression.
- */
-public class BeamSqlTrimExpressionTest extends BeamSqlFnExecutorTestBase {
-
-  @Test public void accept() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " hello "));
-    assertTrue(new BeamSqlTrimExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, 
SqlTrimFunction.Flag.BOTH));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
-    assertTrue(new BeamSqlTrimExpression(operands).accept());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
-    assertFalse(new BeamSqlTrimExpression(operands).accept());
-  }
-
-  @Test public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, 
SqlTrimFunction.Flag.LEADING));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
-    Assert.assertEquals("__hehe",
-        new BeamSqlTrimExpression(operands).evaluate(row, null).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, 
SqlTrimFunction.Flag.TRAILING));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
-    Assert.assertEquals("hehe__",
-        new BeamSqlTrimExpression(operands).evaluate(row, null).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, 
SqlTrimFunction.Flag.BOTH));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "__"));
-    Assert.assertEquals("__",
-        new BeamSqlTrimExpression(operands).evaluate(row, null).getValue());
-
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " hello "));
-    Assert.assertEquals("hello",
-        new BeamSqlTrimExpression(operands).evaluate(row, null).getValue());
-  }
-
-  @Test public void leadingTrim() throws Exception {
-    assertEquals("__hehe",
-        BeamSqlTrimExpression.leadingTrim("hehe__hehe", "he"));
-  }
-
-  @Test public void trailingTrim() throws Exception {
-    assertEquals("hehe__",
-        BeamSqlTrimExpression.trailingTrim("hehe__hehe", "he"));
-  }
-
-  @Test public void trim() throws Exception {
-    assertEquals("__",
-        BeamSqlTrimExpression.leadingTrim(
-        BeamSqlTrimExpression.trailingTrim("hehe__hehe", "he"), "he"
-        ));
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpressionTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpressionTest.java
deleted file mode 100644
index 9187b090cdc..00000000000
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpressionTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/**
- * Test of BeamSqlUpperExpression.
- */
-public class BeamSqlUpperExpressionTest extends BeamSqlFnExecutorTestBase {
-
-  @Test public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
-    assertEquals("HELLO",
-        new BeamSqlUpperExpression(operands).evaluate(row, null).getValue());
-  }
-
-}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 104149)
    Time Spent: 1h  (was: 50m)

> SQL operator argument evaluation should have one place where it is managed
> --------------------------------------------------------------------------
>
>                 Key: BEAM-4365
>                 URL: https://issues.apache.org/jira/browse/BEAM-4365
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> The way Beam SQL is factored, each operator has to explicitly ask its 
> argument to be evaluated. This should be handled generically at a higher 
> level. Since the language is pure and terminating, it is fine for them to 
> vary, but given the simplicity of the expression language it makes sense to 
> use simple call-by-value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to