Repository: beam
Updated Branches:
  refs/heads/DSL_SQL f7ee8d33e -> 4ea38d823


[BEAM-2158] Implement the arithmetic operators


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a376f37b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a376f37b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a376f37b

Branch: refs/heads/DSL_SQL
Commit: a376f37b908828fa0d44b5d71822d51b9c47f971
Parents: f7ee8d3
Author: James Xu <[email protected]>
Authored: Thu May 4 22:15:02 2017 +0800
Committer: Jean-Baptiste Onofré <[email protected]>
Committed: Tue May 9 10:23:45 2017 +0200

----------------------------------------------------------------------
 .../dsls/sql/interpreter/BeamSQLFnExecutor.java |  21 +-
 .../arithmetic/BeamSqlArithmeticExpression.java | 101 ++++++++
 .../arithmetic/BeamSqlDivideExpression.java     |  40 +++
 .../arithmetic/BeamSqlMinusExpression.java      |  40 +++
 .../arithmetic/BeamSqlModExpression.java        |  40 +++
 .../arithmetic/BeamSqlMultiplyExpression.java   |  40 +++
 .../arithmetic/BeamSqlPlusExpression.java       |  40 +++
 .../operator/arithmetic/package-info.java       |  22 ++
 .../sql/interpreter/BeamSQLFnExecutorTest.java  |  55 +++-
 .../BeamSqlArithmeticExpressionTest.java        | 251 +++++++++++++++++++
 10 files changed, 637 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a376f37b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
index 32e2ffc..78663f8 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
@@ -33,6 +33,11 @@ import 
org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlNotEqualExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlOrExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
 import org.apache.beam.dsls.sql.rel.BeamFilterRel;
 import org.apache.beam.dsls.sql.rel.BeamProjectRel;
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
@@ -74,7 +79,7 @@ public class BeamSQLFnExecutor implements 
BeamSQLExpressionExecutor {
    * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} 
recursively,
    * and represent each {@link SqlOperator} with a corresponding {@link 
BeamSqlExpression}.
    */
-  private BeamSqlExpression buildExpression(RexNode rexNode) {
+  static BeamSqlExpression buildExpression(RexNode rexNode) {
     if (rexNode instanceof RexLiteral) {
       RexLiteral node = (RexLiteral) rexNode;
       return BeamSqlPrimitive.of(node.getTypeName(), node.getValue());
@@ -107,12 +112,24 @@ public class BeamSQLFnExecutor implements 
BeamSQLExpressionExecutor {
         case "<=":
           return new BeamSqlLessThanEqualExpression(subExps);
 
+        // arithmetic operators
+        case "+":
+          return new BeamSqlPlusExpression(subExps);
+        case "-":
+          return new BeamSqlMinusExpression(subExps);
+        case "*":
+          return new BeamSqlMultiplyExpression(subExps);
+        case "/":
+          return new BeamSqlDivideExpression(subExps);
+        case "MOD":
+          return new BeamSqlModExpression(subExps);
+
         case "IS NULL":
           return new BeamSqlIsNullExpression(subExps.get(0));
         case "IS NOT NULL":
           return new BeamSqlIsNotNullExpression(subExps.get(0));
       default:
-        throw new BeamSqlUnsupportedException();
+        throw new BeamSqlUnsupportedException("Operator: " + opName + " not 
supported yet!");
       }
     } else {
       throw new BeamSqlUnsupportedException(

http://git-wip-us.apache.org/repos/asf/beam/blob/a376f37b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
new file mode 100644
index 0000000..5e1d068
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Base class for all arithmetic operators.
+ */
+public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
+  private BeamSqlArithmeticExpression(List<BeamSqlExpression> operands, 
SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  public BeamSqlArithmeticExpression(List<BeamSqlExpression> operands) {
+    // the outputType can not be determined in constructor
+    // will be determined in evaluate() method. ANY here is just a placeholder.
+    super(operands, SqlTypeName.ANY);
+  }
+
+  @Override public boolean accept() {
+    if (operands.size() != 2) {
+      return false;
+    }
+
+    for (BeamSqlExpression operand : operands) {
+      if (!SqlTypeName.NUMERIC_TYPES.contains(operand.getOutputType())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * https://dev.mysql.com/doc/refman/5.7/en/arithmetic-functions.html.
+   */
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSQLRow 
inputRecord) {
+    BeamSqlExpression leftOp = operands.get(0);
+    BeamSqlExpression rightOp = operands.get(1);
+
+    // In the case of -, +, and *, the result is calculated as Long if both
+    // operands are INT_TYPES(byte, short, integer, long).
+    if (SqlTypeName.INT_TYPES.contains(leftOp.getOutputType())
+        && SqlTypeName.INT_TYPES.contains(rightOp.getOutputType())) {
+      Long leftValue = 
Long.valueOf(leftOp.evaluate(inputRecord).getValue().toString());
+      Long rightValue = 
Long.valueOf(rightOp.evaluate(inputRecord).getValue().toString());
+      Long ret = calc(leftValue, rightValue);
+      return BeamSqlPrimitive.of(SqlTypeName.BIGINT, ret);
+    } else {
+      // If any of the operands of a +, -, /, *, % is a real
+      //  OR
+      // It is a division calculation
+      // we treat them as Double
+      double leftValue = getDouble(inputRecord, leftOp);
+      double rightValue = getDouble(inputRecord, rightOp);
+      return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, calc(leftValue, 
rightValue));
+    }
+  }
+
+  private double getDouble(BeamSQLRow inputRecord, BeamSqlExpression op) {
+    Object raw = op.evaluate(inputRecord).getValue();
+    Double ret = null;
+    if (SqlTypeName.NUMERIC_TYPES.contains(op.getOutputType())) {
+      ret = ((Number) raw).doubleValue();
+    }
+
+    return ret;
+  }
+
+  /**
+   * For {@link SqlTypeName#INT_TYPES} calculation of '+', '-', '*'.
+   */
+  public abstract Long calc(Long left, Long right);
+
+
+  /**
+   * For other {@link SqlTypeName#NUMERIC_TYPES} of '+', '-', '*', '/'.
+   */
+  public abstract Double calc(Number left, Number right);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a376f37b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
new file mode 100644
index 0000000..c23f54c
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '/' operator.
+ */
+public class BeamSqlDivideExpression extends BeamSqlArithmeticExpression {
+  public BeamSqlDivideExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override public Long calc(Long left, Long right) {
+    return left / right;
+  }
+
+  @Override public Double calc(Number left, Number right) {
+    return left.doubleValue() / right.doubleValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a376f37b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
new file mode 100644
index 0000000..c6d7ca0
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '-' operator.
+ */
+public class BeamSqlMinusExpression extends BeamSqlArithmeticExpression {
+  public BeamSqlMinusExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override public Long calc(Long left, Long right) {
+    return left - right;
+  }
+
+  @Override public Double calc(Number left, Number right) {
+    return left.doubleValue() - right.doubleValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a376f37b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
new file mode 100644
index 0000000..6323e95
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '%' operator.
+ */
+public class BeamSqlModExpression extends BeamSqlArithmeticExpression {
+  public BeamSqlModExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override public Long calc(Long left, Long right) {
+    return left % right;
+  }
+
+  @Override public Double calc(Number left, Number right) {
+    return left.doubleValue() % right.doubleValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a376f37b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
new file mode 100644
index 0000000..42ba4a5
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '*' operator.
+ */
+public class BeamSqlMultiplyExpression extends BeamSqlArithmeticExpression {
+  public BeamSqlMultiplyExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override public Long calc(Long left, Long right) {
+    return left * right;
+  }
+
+  @Override public Double calc(Number left, Number right) {
+    return left.doubleValue() * right.doubleValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a376f37b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
new file mode 100644
index 0000000..59be053
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '+' operator.
+ */
+public class BeamSqlPlusExpression extends BeamSqlArithmeticExpression {
+  public BeamSqlPlusExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override public Double calc(Number left, Number right) {
+    return left.doubleValue() + right.doubleValue();
+  }
+
+  @Override public Long calc(Long left, Long right) {
+    return left + right;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a376f37b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/package-info.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/package-info.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/package-info.java
new file mode 100644
index 0000000..b8f2175
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Arithmetic operators.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;

http://git-wip-us.apache.org/repos/asf/beam/blob/a376f37b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java
index abbe3f7..8df0865 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java
@@ -17,19 +17,28 @@
  */
 package org.apache.beam.dsls.sql.interpreter;
 
+import static org.junit.Assert.assertTrue;
+
 import java.math.BigDecimal;
 import java.util.Arrays;
+
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
 import 
org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanEqualExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
 import org.apache.beam.dsls.sql.rel.BeamFilterRel;
 import org.apache.beam.dsls.sql.rel.BeamProjectRel;
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Assert;
@@ -60,27 +69,27 @@ public class BeamSQLFnExecutorTest extends 
BeamSQLFnExecutorTestBase {
     Assert.assertEquals(1, executor.exps.size());
 
     BeamSqlExpression l1Exp = executor.exps.get(0);
-    Assert.assertTrue(l1Exp instanceof BeamSqlAndExpression);
+    assertTrue(l1Exp instanceof BeamSqlAndExpression);
     Assert.assertEquals(SqlTypeName.BOOLEAN, l1Exp.getOutputType());
 
     Assert.assertEquals(2, l1Exp.getOperands().size());
     BeamSqlExpression l1Left = (BeamSqlExpression) l1Exp.getOperands().get(0);
     BeamSqlExpression l1Right = (BeamSqlExpression) l1Exp.getOperands().get(1);
 
-    Assert.assertTrue(l1Left instanceof BeamSqlLessThanEqualExpression);
-    Assert.assertTrue(l1Right instanceof BeamSqlEqualExpression);
+    assertTrue(l1Left instanceof BeamSqlLessThanEqualExpression);
+    assertTrue(l1Right instanceof BeamSqlEqualExpression);
 
     Assert.assertEquals(2, l1Left.getOperands().size());
     BeamSqlExpression l1LeftLeft = (BeamSqlExpression) 
l1Left.getOperands().get(0);
     BeamSqlExpression l1LeftRight = (BeamSqlExpression) 
l1Left.getOperands().get(1);
-    Assert.assertTrue(l1LeftLeft instanceof BeamSqlInputRefExpression);
-    Assert.assertTrue(l1LeftRight instanceof BeamSqlPrimitive);
+    assertTrue(l1LeftLeft instanceof BeamSqlInputRefExpression);
+    assertTrue(l1LeftRight instanceof BeamSqlPrimitive);
 
     Assert.assertEquals(2, l1Right.getOperands().size());
     BeamSqlExpression l1RightLeft = (BeamSqlExpression) 
l1Right.getOperands().get(0);
     BeamSqlExpression l1RightRight = (BeamSqlExpression) 
l1Right.getOperands().get(1);
-    Assert.assertTrue(l1RightLeft instanceof BeamSqlInputRefExpression);
-    Assert.assertTrue(l1RightRight instanceof BeamSqlPrimitive);
+    assertTrue(l1RightLeft instanceof BeamSqlInputRefExpression);
+    assertTrue(l1RightRight instanceof BeamSqlPrimitive);
   }
 
   @Test
@@ -92,10 +101,34 @@ public class BeamSQLFnExecutorTest extends 
BeamSQLFnExecutorTestBase {
 
     executor.prepare();
     Assert.assertEquals(4, executor.exps.size());
-    Assert.assertTrue(executor.exps.get(0) instanceof 
BeamSqlInputRefExpression);
-    Assert.assertTrue(executor.exps.get(1) instanceof 
BeamSqlInputRefExpression);
-    Assert.assertTrue(executor.exps.get(2) instanceof 
BeamSqlInputRefExpression);
-    Assert.assertTrue(executor.exps.get(3) instanceof 
BeamSqlInputRefExpression);
+    assertTrue(executor.exps.get(0) instanceof BeamSqlInputRefExpression);
+    assertTrue(executor.exps.get(1) instanceof BeamSqlInputRefExpression);
+    assertTrue(executor.exps.get(2) instanceof BeamSqlInputRefExpression);
+    assertTrue(executor.exps.get(3) instanceof BeamSqlInputRefExpression);
   }
 
+
+  @Test
+  public void testBuildExpression_arithmetic() {
+    testBuildArithmeticExpression(SqlStdOperatorTable.PLUS, 
BeamSqlPlusExpression.class);
+    testBuildArithmeticExpression(SqlStdOperatorTable.MINUS, 
BeamSqlMinusExpression.class);
+    testBuildArithmeticExpression(SqlStdOperatorTable.MULTIPLY, 
BeamSqlMultiplyExpression.class);
+    testBuildArithmeticExpression(SqlStdOperatorTable.DIVIDE, 
BeamSqlDivideExpression.class);
+    testBuildArithmeticExpression(SqlStdOperatorTable.MOD, 
BeamSqlModExpression.class);
+  }
+
+  private void testBuildArithmeticExpression(SqlOperator fn,
+      Class<? extends BeamSqlExpression> clazz) {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(fn,
+        Arrays.asList(
+            rexBuilder.makeBigintLiteral(new BigDecimal(1L)),
+            rexBuilder.makeBigintLiteral(new BigDecimal(1L))
+        )
+    );
+    exp = BeamSQLFnExecutor.buildExpression(rexNode);
+
+    assertTrue(exp.getClass().equals(clazz));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a376f37b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
new file mode 100644
index 0000000..abebf17
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Tests for {@code BeamSqlArithmeticExpression}.
+ */
+public class BeamSqlArithmeticExpressionTest extends BeamSQLFnExecutorTestBase 
{
+
+  @Test public void testAccept_normal() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // byte, short
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.TINYINT, Byte.valueOf("1")));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.MAX_VALUE));
+    assertTrue(new BeamSqlPlusExpression(operands).accept());
+
+    // integer, long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertTrue(new BeamSqlPlusExpression(operands).accept());
+
+    // float, double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 1.1F));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 1.1));
+    assertTrue(new BeamSqlPlusExpression(operands).accept());
+
+    // varchar
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 1.1F));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "1"));
+    assertFalse(new BeamSqlPlusExpression(operands).accept());
+  }
+
+  @Test public void testAccept_exception() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // more than 2 operands
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.TINYINT, Byte.valueOf("1")));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.MAX_VALUE));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.MAX_VALUE));
+    assertFalse(new BeamSqlPlusExpression(operands).accept());
+
+    // boolean
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.TINYINT, Byte.valueOf("1")));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    assertFalse(new BeamSqlPlusExpression(operands).accept());
+  }
+
+  @Test public void testPlus() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // integer + integer => long
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals(2L, new 
BeamSqlPlusExpression(operands).evaluate(record).getValue());
+
+    // integer + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2L, new 
BeamSqlPlusExpression(operands).evaluate(record).getValue());
+
+    // long + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2L, new 
BeamSqlPlusExpression(operands).evaluate(record).getValue());
+
+    // float + long => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 1.1F));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(Double.valueOf(Double.valueOf(1.1F) + 1),
+        new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+
+    // double + long => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 1.1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2.1, new 
BeamSqlPlusExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testMinus() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // integer + integer => long
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals(1L, new 
BeamSqlMinusExpression(operands).evaluate(record).getValue());
+
+    // integer + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(1L, new 
BeamSqlMinusExpression(operands).evaluate(record).getValue());
+
+    // long + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(1L, new 
BeamSqlMinusExpression(operands).evaluate(record).getValue());
+
+    // float + long => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(Double.valueOf(Double.valueOf(2.1F) - 1),
+        new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+
+    // double + long => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(1.1, new 
BeamSqlMinusExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testMultiply() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // integer + integer => long
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals(2L, new 
BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+
+    // integer + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2L, new 
BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+
+    // long + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2L, new 
BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+
+    // float + long => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(Double.valueOf(Double.valueOf(2.1F) * 1),
+        new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+
+    // double + long => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2.1, new 
BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testDivide() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // integer + integer => long
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals(2L, new 
BeamSqlDivideExpression(operands).evaluate(record).getValue());
+
+    // integer + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2L, new 
BeamSqlDivideExpression(operands).evaluate(record).getValue());
+
+    // long + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2L, new 
BeamSqlDivideExpression(operands).evaluate(record).getValue());
+
+    // float + long => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(Double.valueOf(Double.valueOf(2.1F) / 1),
+        new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+
+    // double + long => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2.1, new 
BeamSqlDivideExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testMod() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // integer + integer => long
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    assertEquals(1L, new 
BeamSqlModExpression(operands).evaluate(record).getValue());
+
+    // integer + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+    assertEquals(1L, new 
BeamSqlModExpression(operands).evaluate(record).getValue());
+
+    // long + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 3L));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+    assertEquals(1L, new 
BeamSqlModExpression(operands).evaluate(record).getValue());
+
+    // float + long => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 3.1F));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+    assertEquals(Double.valueOf(Double.valueOf(3.1F) % 2),
+        new BeamSqlModExpression(operands).evaluate(record).getValue());
+
+    // double + long => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 3.1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+    assertEquals(1.1, new 
BeamSqlModExpression(operands).evaluate(record).getValue());
+  }
+}

Reply via email to