[ 
https://issues.apache.org/jira/browse/BEAM-4365?focusedWorklogId=115612&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-115612
 ]

ASF GitHub Bot logged work on BEAM-4365:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Jun/18 20:03
            Start Date: 25/Jun/18 20:03
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #5743: [BEAM-4365] 
Migrate some date expressions to operators
URL: https://github.com/apache/beam/pull/5743
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 846c7205f21..4a9e96a89b8 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -36,6 +36,7 @@
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowEndExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowStartExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.DateOperators;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.StringOperators;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlDivideExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMinusExpression;
@@ -58,11 +59,8 @@
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentDateExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimeExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimePlusExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlIntervalMultiplyExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression;
@@ -397,17 +395,20 @@ static BeamSqlExpression buildExpression(RexNode rexNode) 
{
           if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
             return new BeamSqlCeilExpression(subExps);
           } else {
-            return new BeamSqlDateCeilExpression(subExps);
+            return new BeamSqlOperatorExpression(DateOperators.DATETIME_CEIL, 
subExps);
           }
+
         case "FLOOR":
           if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
-            return new BeamSqlFloorExpression(subExps);
+            ret = new BeamSqlFloorExpression(subExps);
           } else {
-            return new BeamSqlDateFloorExpression(subExps);
+            ret = new BeamSqlOperatorExpression(DateOperators.DATETIME_FLOOR, 
subExps);
           }
+          break;
+
         case "EXTRACT_DATE":
         case "EXTRACT":
-          return new BeamSqlExtractExpression(subExps);
+          return new BeamSqlOperatorExpression(DateOperators.EXTRACT, subExps);
 
         case "LOCALTIME":
         case "CURRENT_TIME":
@@ -502,6 +503,8 @@ static BeamSqlExpression buildExpression(RexNode rexNode) {
           String.format("%s is not supported yet", 
rexNode.getClass().toString()));
     }
 
+    // TODO: https://issues.apache.org/jira/browse/BEAM-4622
+    // Many paths above do not reach this validation
     if (!ret.accept()) {
       throw new IllegalStateException(
           ret.getClass().getSimpleName() + " does not accept the operands.(" + 
rexNode + ")");
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/DateOperators.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/DateOperators.java
new file mode 100644
index 00000000000..a8e8e66b6d1
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/DateOperators.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.List;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+import org.joda.time.ReadableInstant;
+
+/** Date operator implementations. */
+public class DateOperators {
+
+  /**
+   * Implementation of CEIL(<i>date or time</i> TO <i>unit</i>).
+   *
+   * <p>Only supports months and years.
+   */
+  public static final BeamSqlOperator DATETIME_CEIL =
+      new BeamSqlOperator() {
+        @Override
+        public boolean accept(List<BeamSqlExpression> arguments) {
+          // TODO: https://issues.apache.org/jira/browse/BEAM-4621
+          // return acceptMonthOrLarger((TimeUnitRange) ((BeamSqlPrimitive)
+          // arguments.get(1)).getValue());
+          return true;
+        }
+
+        @Override
+        public SqlTypeName getOutputType() {
+          return SqlTypeName.TIMESTAMP;
+        }
+
+        @Override
+        public BeamSqlPrimitive apply(List<BeamSqlPrimitive> arguments) {
+          ReadableInstant date = arguments.get(0).getDate();
+          TimeUnitRange unit = (TimeUnitRange) arguments.get(1).getValue();
+          return BeamSqlPrimitive.of(
+              getOutputType(),
+              new DateTime(
+                  DateTimeUtils.unixTimestampCeil(unit, date.getMillis()), 
date.getZone()));
+        }
+      };
+
+  /**
+   * Implementation of FLOOR(<i>date or time</i> TO <i>unit</i>).
+   *
+   * <p>Only supports months and years.
+   */
+  public static final BeamSqlOperator DATETIME_FLOOR =
+      new BeamSqlOperator() {
+        @Override
+        public boolean accept(List<BeamSqlExpression> arguments) {
+          // TODO: https://issues.apache.org/jira/browse/BEAM-4621
+          // return acceptMonthOrLarger((TimeUnitRange) ((BeamSqlPrimitive)
+          // arguments.get(1)).getValue());
+          return true;
+        }
+
+        @Override
+        public SqlTypeName getOutputType() {
+          return SqlTypeName.TIMESTAMP;
+        }
+
+        @Override
+        public BeamSqlPrimitive apply(List<BeamSqlPrimitive> arguments) {
+          ReadableInstant date = arguments.get(0).getDate();
+          TimeUnitRange unit = (TimeUnitRange) arguments.get(1).getValue();
+          return BeamSqlPrimitive.of(
+              getOutputType(),
+              new DateTime(
+                  DateTimeUtils.unixTimestampFloor(unit, date.getMillis()), 
date.getZone()));
+        };
+      };
+
+  private static boolean acceptMonthOrLarger(TimeUnitRange unitRange) {
+    TimeUnit smallestUnit = unitRange.endUnit == null ? unitRange.startUnit : 
unitRange.endUnit;
+    return smallestUnit.multiplier != null
+        && (smallestUnit.multiplier.compareTo(TimeUnit.MONTH.multiplier) >= 0);
+  }
+
+  /**
+   * {@link BeamSqlOperator} for EXTRACT.
+   *
+   * <p>The following date functions also implicitly converted to {@code 
EXTRACT}:
+   *
+   * <ul>
+   *   <li>YEAR(date) =&gt; EXTRACT(YEAR FROM date)
+   *   <li>MONTH(date) =&gt; EXTRACT(MONTH FROM date)
+   *   <li>DAY(date) =&gt; EXTRACT(DAY FROM date)
+   *   <li>QUARTER(date) =&gt; EXTRACT(QUARTER FROM date)
+   *   <li>WEEK(date) =&gt; EXTRACT(WEEK FROM date)
+   *   <li>DAYOFYEAR(date) =&gt; EXTRACT(DOY FROM date)
+   *   <li>DAYOFMONTH(date) =&gt; EXTRACT(DAY FROM date)
+   *   <li>DAYOFWEEK(date) =&gt; EXTRACT(DOW FROM date)
+   *   <li>HOUR(date) =&gt; EXTRACT(HOUR FROM date)
+   *   <li>MINUTE(date) =&gt; EXTRACT(MINUTE FROM date)
+   *   <li>SECOND(date) =&gt; EXTRACT(SECOND FROM date)
+   * </ul>
+   */
+  public static final BeamSqlOperator EXTRACT =
+      new BeamSqlOperator() {
+        @Override
+        public boolean accept(List<BeamSqlExpression> arguments) {
+          return arguments.size() == 2
+              && arguments.get(0).getOutputType() == SqlTypeName.SYMBOL
+              && 
SqlTypeName.DATETIME_TYPES.contains(arguments.get(1).getOutputType());
+        }
+
+        @Override
+        public SqlTypeName getOutputType() {
+          return SqlTypeName.BIGINT;
+        }
+
+        @Override
+        public BeamSqlPrimitive apply(List<BeamSqlPrimitive> arguments) {
+          ReadableInstant time = arguments.get(1).getDate();
+          TimeUnitRange unit = (TimeUnitRange) arguments.get(0).getValue();
+
+          switch (unit) {
+            case YEAR:
+            case QUARTER:
+            case MONTH:
+            case DAY:
+            case DOW:
+            case WEEK:
+            case DOY:
+            case CENTURY:
+            case MILLENNIUM:
+              Long timeByDay = time.getMillis() / DateTimeUtils.MILLIS_PER_DAY;
+              Long extracted = DateTimeUtils.unixDateExtract(unit, timeByDay);
+              return BeamSqlPrimitive.of(getOutputType(), extracted);
+
+            case HOUR:
+            case MINUTE:
+            case SECOND:
+              int timeInDay = (int) (time.getMillis() % 
DateTimeUtils.MILLIS_PER_DAY);
+              extracted = (long) DateTimeUtils.unixTimeExtract(unit, 
timeInDay);
+              return BeamSqlPrimitive.of(getOutputType(), extracted);
+
+            default:
+              throw new UnsupportedOperationException(
+                  "Extract for time unit: " + unit + " not supported!");
+          }
+        }
+      };
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
deleted file mode 100644
index 19c8478c9ef..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
-
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.avatica.util.DateTimeUtils;
-import org.apache.calcite.avatica.util.TimeUnitRange;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.joda.time.DateTime;
-import org.joda.time.ReadableInstant;
-
-/**
- * {@code BeamSqlExpression} for CEIL(date).
- *
- * <p>NOTE: only support CEIL for {@link TimeUnitRange#YEAR} and {@link 
TimeUnitRange#MONTH}.
- */
-public class BeamSqlDateCeilExpression extends BeamSqlExpression {
-  public BeamSqlDateCeilExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.TIMESTAMP);
-  }
-
-  @Override
-  public boolean accept() {
-    return operands.size() == 2 && opType(1) == SqlTypeName.SYMBOL;
-  }
-
-  @Override
-  public BeamSqlPrimitive evaluate(
-      Row inputRow, BoundedWindow window, BeamSqlExpressionEnvironment env) {
-    ReadableInstant date = (ReadableInstant) opValueEvaluated(0, inputRow, 
window, env);
-    long time = date.getMillis();
-    TimeUnitRange unit = (TimeUnitRange) opValueEvaluated(1, inputRow, window, 
env);
-
-    long newTime = DateTimeUtils.unixTimestampCeil(unit, time);
-    DateTime newDate = new DateTime(newTime, date.getZone());
-
-    return BeamSqlPrimitive.of(outputType, newDate);
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
deleted file mode 100644
index 9cc74ca69ea..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
-
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.avatica.util.DateTimeUtils;
-import org.apache.calcite.avatica.util.TimeUnitRange;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.joda.time.DateTime;
-import org.joda.time.ReadableInstant;
-
-/**
- * {@code BeamSqlExpression} for FLOOR(date).
- *
- * <p>NOTE: only support FLOOR for {@link TimeUnitRange#YEAR} and {@link 
TimeUnitRange#MONTH}.
- */
-public class BeamSqlDateFloorExpression extends BeamSqlExpression {
-  public BeamSqlDateFloorExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.DATE);
-  }
-
-  @Override
-  public boolean accept() {
-    return operands.size() == 2 && opType(1) == SqlTypeName.SYMBOL;
-  }
-
-  @Override
-  public BeamSqlPrimitive evaluate(
-      Row inputRow, BoundedWindow window, BeamSqlExpressionEnvironment env) {
-    ReadableInstant date = (ReadableInstant) opValueEvaluated(0, inputRow, 
window, env);
-    long time = date.getMillis();
-    TimeUnitRange unit = (TimeUnitRange) opValueEvaluated(1, inputRow, window, 
env);
-
-    long newTime = DateTimeUtils.unixTimestampFloor(unit, time);
-
-    DateTime newDate = new DateTime(newTime, date.getZone());
-    return BeamSqlPrimitive.of(outputType, newDate);
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
deleted file mode 100644
index 5f9285eba72..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
-
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.avatica.util.DateTimeUtils;
-import org.apache.calcite.avatica.util.TimeUnitRange;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.joda.time.ReadableInstant;
-
-/**
- * {@code BeamSqlExpression} for EXTRACT.
- *
- * <p>The following date functions also implicitly converted to {@code 
EXTRACT}:
- *
- * <ul>
- *   <li>YEAR(date) =&gt; EXTRACT(YEAR FROM date)
- *   <li>MONTH(date) =&gt; EXTRACT(MONTH FROM date)
- *   <li>DAY(date) =&gt; EXTRACT(DAY FROM date)
- *   <li>QUARTER(date) =&gt; EXTRACT(QUARTER FROM date)
- *   <li>WEEK(date) =&gt; EXTRACT(WEEK FROM date)
- *   <li>DAYOFYEAR(date) =&gt; EXTRACT(DOY FROM date)
- *   <li>DAYOFMONTH(date) =&gt; EXTRACT(DAY FROM date)
- *   <li>DAYOFWEEK(date) =&gt; EXTRACT(DOW FROM date)
- *   <li>HOUR(date) =&gt; EXTRACT(HOUR FROM date)
- *   <li>MINUTE(date) =&gt; EXTRACT(MINUTE FROM date)
- *   <li>SECOND(date) =&gt; EXTRACT(SECOND FROM date)
- * </ul>
- */
-public class BeamSqlExtractExpression extends BeamSqlExpression {
-  public BeamSqlExtractExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.BIGINT);
-  }
-
-  @Override
-  public boolean accept() {
-    return operands.size() == 2 && opType(1) == SqlTypeName.TIMESTAMP;
-  }
-
-  @Override
-  public BeamSqlPrimitive evaluate(
-      Row inputRow, BoundedWindow window, BeamSqlExpressionEnvironment env) {
-    ReadableInstant time = (ReadableInstant) opValueEvaluated(1, inputRow, 
window, env);
-
-    TimeUnitRange unit = (TimeUnitRange) opValueEvaluated(0, inputRow, window, 
env);
-
-    switch (unit) {
-      case YEAR:
-      case QUARTER:
-      case MONTH:
-      case DAY:
-      case DOW:
-      case WEEK:
-      case DOY:
-      case CENTURY:
-      case MILLENNIUM:
-        Long timeByDay = time.getMillis() / DateTimeUtils.MILLIS_PER_DAY;
-        Long extracted = DateTimeUtils.unixDateExtract(unit, timeByDay);
-        return BeamSqlPrimitive.of(outputType, extracted);
-
-      case HOUR:
-      case MINUTE:
-      case SECOND:
-        int timeInDay = (int) (time.getMillis() % 
DateTimeUtils.MILLIS_PER_DAY);
-        extracted = (long) DateTimeUtils.unixTimeExtract(unit, timeInDay);
-        return BeamSqlPrimitive.of(outputType, extracted);
-
-      default:
-        throw new UnsupportedOperationException(
-            "Extract for time unit: " + unit + " not supported!");
-    }
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java
index 9ea66899a89..04cb9a57774 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java
@@ -41,9 +41,12 @@
 import 
org.apache.beam.sdk.extensions.sql.integrationtest.BeamSqlBuiltinFunctionsIntegrationTestBase;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 /**
  * DSL compliance tests for the row-level operators of {@link
@@ -51,6 +54,8 @@
  */
 public class BeamSqlDslSqlStdOperatorsTest extends 
BeamSqlBuiltinFunctionsIntegrationTestBase {
 
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
   /** Calcite operators are identified by name and kind. */
   @AutoValue
   abstract static class SqlOperatorId {
@@ -71,14 +76,24 @@ private static SqlOperatorId sqlOperatorId(SqlOperatorTest 
annotation) {
     return sqlOperatorId(annotation.name(), 
SqlKind.valueOf(annotation.kind()));
   }
 
+  private static SqlOperatorId sqlOperatorId(SqlOperator sqlOperator) {
+    return sqlOperatorId(sqlOperator.getName(), sqlOperator.getKind());
+  }
+
   private static final List<SqlOperatorId> NON_ROW_OPERATORS =
       ImmutableList.of(
-          sqlOperatorId("UNION"),
-          sqlOperatorId("UNION ALL", SqlKind.UNION),
-          sqlOperatorId("EXCEPT"),
-          sqlOperatorId("EXCEPT ALL", SqlKind.EXCEPT),
-          sqlOperatorId("INTERSECT"),
-          sqlOperatorId("INTERSECT ALL", SqlKind.INTERSECT));
+              SqlStdOperatorTable.ELEMENT_SLICE, // internal
+              SqlStdOperatorTable.EXCEPT,
+              SqlStdOperatorTable.EXCEPT_ALL,
+              SqlStdOperatorTable.INTERSECT,
+              SqlStdOperatorTable.INTERSECT_ALL,
+              SqlStdOperatorTable.LITERAL_CHAIN, // internal
+              SqlStdOperatorTable.PATTERN_CONCAT, // "," PATTERN_CONCAT
+              SqlStdOperatorTable.UNION,
+              SqlStdOperatorTable.UNION_ALL)
+          .stream()
+          .map(op -> sqlOperatorId(op))
+          .collect(Collectors.toList());
 
   /**
    * LEGACY ADAPTER - DO NOT USE DIRECTLY. Use {@code 
getAnnotationsByType(SqlOperatorTest.class)},
@@ -171,7 +186,10 @@ public void testThatAllOperatorsAreTested() {
       // Sorting is just to make failures more readable until we have 100% 
coverage
       List<SqlOperatorId> untestedList = Lists.newArrayList(untestedOperators);
       untestedList.sort(orderByNameThenKind);
-      fail("No tests declared for operators:\n\t" + 
Joiner.on("\n\t").join(untestedList));
+      fail(
+          String.format(
+              "No tests declared for %s operators:\n\t%s",
+              untestedList.size(), Joiner.on("\n\t").join(untestedList)));
     }
   }
 
@@ -253,4 +271,221 @@ public void testArrayFunctions() {
 
     checker.buildRunAndCheck();
   }
+
+  @Test
+  @SqlOperatorTest(name = "DAYOFMONTH", kind = "OTHER")
+  @SqlOperatorTest(name = "DAYOFWEEK", kind = "OTHER")
+  @SqlOperatorTest(name = "DAYOFYEAR", kind = "OTHER")
+  @SqlOperatorTest(name = "EXTRACT", kind = "EXTRACT")
+  @SqlOperatorTest(name = "YEAR", kind = "OTHER")
+  @SqlOperatorTest(name = "QUARTER", kind = "OTHER")
+  @SqlOperatorTest(name = "MONTH", kind = "OTHER")
+  @SqlOperatorTest(name = "WEEK", kind = "OTHER")
+  @SqlOperatorTest(name = "HOUR", kind = "OTHER")
+  @SqlOperatorTest(name = "MINUTE", kind = "OTHER")
+  @SqlOperatorTest(name = "SECOND", kind = "OTHER")
+  public void testBasicDateTimeFunctions() {
+    ExpressionChecker checker =
+        new ExpressionChecker()
+            .addExpr("EXTRACT(YEAR FROM ts)", 1986L)
+            .addExpr("YEAR(ts)", 1986L)
+            .addExpr("QUARTER(ts)", 1L)
+            .addExpr("MONTH(ts)", 2L)
+            .addExpr("WEEK(ts)", 7L)
+            .addExpr("DAYOFMONTH(ts)", 15L)
+            .addExpr("DAYOFYEAR(ts)", 46L)
+            .addExpr("DAYOFWEEK(ts)", 7L)
+            .addExpr("HOUR(ts)", 11L)
+            .addExpr("MINUTE(ts)", 35L)
+            .addExpr("SECOND(ts)", 26L);
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  // More needed @SqlOperatorTest(name = "FLOOR", kind = "FLOOR")
+  // More needed @SqlOperatorTest(name = "CEIL", kind = "CEIL")
+  public void testFloorAndCeil() {
+    ExpressionChecker checker =
+        new ExpressionChecker()
+            .addExpr("FLOOR(ts TO MONTH)", parseDate("1986-02-01 00:00:00"))
+            .addExpr("FLOOR(ts TO YEAR)", parseDate("1986-01-01 00:00:00"))
+            .addExpr("CEIL(ts TO MONTH)", parseDate("1986-03-01 00:00:00"))
+            .addExpr("CEIL(ts TO YEAR)", parseDate("1987-01-01 00:00:00"));
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  @Ignore("https://issues.apache.org/jira/browse/BEAM-4622";)
+  public void testFloorAndCeilResolutionLimit() {
+    thrown.expect(IllegalArgumentException.class);
+    ExpressionChecker checker =
+        new ExpressionChecker().addExpr("FLOOR(ts TO DAY)", 
parseDate("1986-02-01 00:00:00"));
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  @SqlOperatorTest(name = "TIMESTAMPADD", kind = "TIMESTAMP_ADD")
+  public void testDatetimePlusFunction() {
+    ExpressionChecker checker =
+        new ExpressionChecker()
+            .addExpr(
+                "TIMESTAMPADD(SECOND, 3, TIMESTAMP '1984-04-19 01:02:03')",
+                parseDate("1984-04-19 01:02:06"))
+            .addExpr(
+                "TIMESTAMPADD(MINUTE, 3, TIMESTAMP '1984-04-19 01:02:03')",
+                parseDate("1984-04-19 01:05:03"))
+            .addExpr(
+                "TIMESTAMPADD(HOUR, 3, TIMESTAMP '1984-04-19 01:02:03')",
+                parseDate("1984-04-19 04:02:03"))
+            .addExpr(
+                "TIMESTAMPADD(DAY, 3, TIMESTAMP '1984-04-19 01:02:03')",
+                parseDate("1984-04-22 01:02:03"))
+            .addExpr(
+                "TIMESTAMPADD(MONTH, 2, TIMESTAMP '1984-01-19 01:02:03')",
+                parseDate("1984-03-19 01:02:03"))
+            .addExpr(
+                "TIMESTAMPADD(YEAR, 2, TIMESTAMP '1985-01-19 01:02:03')",
+                parseDate("1987-01-19 01:02:03"));
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  @SqlOperatorTest(name = "DATETIME_PLUS", kind = "PLUS")
+  public void testDatetimeInfixPlus() {
+    ExpressionChecker checker =
+        new ExpressionChecker()
+            .addExpr(
+                "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '3' SECOND",
+                parseDate("1984-01-19 01:02:06"))
+            .addExpr(
+                "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' MINUTE",
+                parseDate("1984-01-19 01:04:03"))
+            .addExpr(
+                "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' HOUR",
+                parseDate("1984-01-19 03:02:03"))
+            .addExpr(
+                "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' DAY",
+                parseDate("1984-01-21 01:02:03"))
+            .addExpr(
+                "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' MONTH",
+                parseDate("1984-03-19 01:02:03"))
+            .addExpr(
+                "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' YEAR",
+                parseDate("1986-01-19 01:02:03"));
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  @SqlOperatorTest(name = "TIMESTAMPDIFF", kind = "TIMESTAMP_DIFF")
+  public void testTimestampDiff() {
+    ExpressionChecker checker =
+        new ExpressionChecker()
+            .addExpr(
+                "TIMESTAMPDIFF(SECOND, TIMESTAMP '1984-04-19 01:01:58', "
+                    + "TIMESTAMP '1984-04-19 01:01:58')",
+                0)
+            .addExpr(
+                "TIMESTAMPDIFF(SECOND, TIMESTAMP '1984-04-19 01:01:58', "
+                    + "TIMESTAMP '1984-04-19 01:01:59')",
+                1)
+            .addExpr(
+                "TIMESTAMPDIFF(SECOND, TIMESTAMP '1984-04-19 01:01:58', "
+                    + "TIMESTAMP '1984-04-19 01:02:00')",
+                2)
+            .addExpr(
+                "TIMESTAMPDIFF(MINUTE, TIMESTAMP '1984-04-19 01:01:58', "
+                    + "TIMESTAMP '1984-04-19 01:02:57')",
+                0)
+            .addExpr(
+                "TIMESTAMPDIFF(MINUTE, TIMESTAMP '1984-04-19 01:01:58', "
+                    + "TIMESTAMP '1984-04-19 01:02:58')",
+                1)
+            .addExpr(
+                "TIMESTAMPDIFF(MINUTE, TIMESTAMP '1984-04-19 01:01:58', "
+                    + "TIMESTAMP '1984-04-19 01:03:58')",
+                2)
+            .addExpr(
+                "TIMESTAMPDIFF(HOUR, TIMESTAMP '1984-04-19 01:01:58', "
+                    + "TIMESTAMP '1984-04-19 02:01:57')",
+                0)
+            .addExpr(
+                "TIMESTAMPDIFF(HOUR, TIMESTAMP '1984-04-19 01:01:58', "
+                    + "TIMESTAMP '1984-04-19 02:01:58')",
+                1)
+            .addExpr(
+                "TIMESTAMPDIFF(HOUR, TIMESTAMP '1984-04-19 01:01:58', "
+                    + "TIMESTAMP '1984-04-19 03:01:58')",
+                2)
+            .addExpr(
+                "TIMESTAMPDIFF(DAY, TIMESTAMP '1984-04-19 01:01:58', "
+                    + "TIMESTAMP '1984-04-20 01:01:57')",
+                0)
+            .addExpr(
+                "TIMESTAMPDIFF(DAY, TIMESTAMP '1984-04-19 01:01:58', "
+                    + "TIMESTAMP '1984-04-20 01:01:58')",
+                1)
+            .addExpr(
+                "TIMESTAMPDIFF(DAY, TIMESTAMP '1984-04-19 01:01:58', "
+                    + "TIMESTAMP '1984-04-21 01:01:58')",
+                2)
+            .addExpr(
+                "TIMESTAMPDIFF(MONTH, TIMESTAMP '1984-01-19 01:01:58', "
+                    + "TIMESTAMP '1984-02-19 01:01:57')",
+                0)
+            .addExpr(
+                "TIMESTAMPDIFF(MONTH, TIMESTAMP '1984-01-19 01:01:58', "
+                    + "TIMESTAMP '1984-02-19 01:01:58')",
+                1)
+            .addExpr(
+                "TIMESTAMPDIFF(MONTH, TIMESTAMP '1984-01-19 01:01:58', "
+                    + "TIMESTAMP '1984-03-19 01:01:58')",
+                2)
+            .addExpr(
+                "TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
+                    + "TIMESTAMP '1982-01-19 01:01:57')",
+                0)
+            .addExpr(
+                "TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
+                    + "TIMESTAMP '1982-01-19 01:01:58')",
+                1)
+            .addExpr(
+                "TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
+                    + "TIMESTAMP '1983-01-19 01:01:58')",
+                2)
+            .addExpr(
+                "TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
+                    + "TIMESTAMP '1980-01-19 01:01:58')",
+                -1)
+            .addExpr(
+                "TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
+                    + "TIMESTAMP '1979-01-19 01:01:58')",
+                -2);
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  // More needed @SqlOperatorTest(name = "-", kind = "MINUS")
+  public void testTimestampMinusInterval() throws Exception {
+    ExpressionChecker checker =
+        new ExpressionChecker()
+            .addExpr(
+                "TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '2' SECOND",
+                parseDate("1984-04-19 01:01:56"))
+            .addExpr(
+                "TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '1' MINUTE",
+                parseDate("1984-04-19 01:00:58"))
+            .addExpr(
+                "TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '4' HOUR",
+                parseDate("1984-04-18 21:01:58"))
+            .addExpr(
+                "TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '5' DAY",
+                parseDate("1984-04-14 01:01:58"))
+            .addExpr(
+                "TIMESTAMP '1984-01-19 01:01:58' - INTERVAL '2' MONTH",
+                parseDate("1983-11-19 01:01:58"))
+            .addExpr(
+                "TIMESTAMP '1984-01-19 01:01:58' - INTERVAL '1' YEAR",
+                parseDate("1983-01-19 01:01:58"));
+    checker.buildRunAndCheck();
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
index 0d50c696d7e..97c7a282640 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
@@ -35,17 +35,13 @@
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentDateExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimeExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimePlusExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlIntervalMultiplyExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression;
 import org.apache.calcite.avatica.util.TimeUnit;
-import org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.rex.RexLiteral;
@@ -153,33 +149,6 @@ public void testBuildExpression_date() {
     calendar.setTimeZone(TimeZone.getTimeZone("GMT"));
     calendar.setTime(new Date());
 
-    // CEIL
-    rexNode =
-        rexBuilder.makeCall(
-            SqlStdOperatorTable.CEIL,
-            Arrays.asList(
-                rexBuilder.makeDateLiteral(calendar), 
rexBuilder.makeFlag(TimeUnitRange.MONTH)));
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlDateCeilExpression);
-
-    // FLOOR
-    rexNode =
-        rexBuilder.makeCall(
-            SqlStdOperatorTable.FLOOR,
-            Arrays.asList(
-                rexBuilder.makeDateLiteral(calendar), 
rexBuilder.makeFlag(TimeUnitRange.MONTH)));
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlDateFloorExpression);
-
-    // EXTRACT == EXTRACT_DATE?
-    rexNode =
-        rexBuilder.makeCall(
-            SqlStdOperatorTable.EXTRACT,
-            Arrays.asList(
-                rexBuilder.makeFlag(TimeUnitRange.MONTH), 
rexBuilder.makeDateLiteral(calendar)));
-    exp = BeamSqlFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlExtractExpression);
-
     // CURRENT_DATE
     rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CURRENT_DATE, 
Arrays.asList());
     exp = BeamSqlFnExecutor.buildExpression(rexNode);
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
deleted file mode 100644
index ec82a8e42a8..00000000000
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
-
-import java.util.ArrayList;
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironments;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.avatica.util.TimeUnitRange;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Assert;
-import org.junit.Test;
-
-/** Test for {@code BeamSqlDateCeilExpression}. */
-public class BeamSqlDateCeilExpressionTest extends 
BeamSqlDateExpressionTestBase {
-  @Test
-  public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE, 
str2DateTime("2017-05-22 09:10:11")));
-    // YEAR
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.YEAR));
-    Assert.assertEquals(
-        str2DateTime("2018-01-01 00:00:00"),
-        new BeamSqlDateCeilExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.row, null, 
BeamSqlExpressionEnvironments.empty())
-            .getDate());
-
-    operands.set(1, BeamSqlPrimitive.of(SqlTypeName.SYMBOL, 
TimeUnitRange.MONTH));
-    Assert.assertEquals(
-        str2DateTime("2017-06-01 00:00:00"),
-        new BeamSqlDateCeilExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.row, null, 
BeamSqlExpressionEnvironments.empty())
-            .getDate());
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
deleted file mode 100644
index 4a0581fc5bc..00000000000
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironments;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.avatica.util.TimeUnitRange;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Test;
-
-/** Test for {@code BeamSqlDateFloorExpression}. */
-public class BeamSqlDateFloorExpressionTest extends 
BeamSqlDateExpressionTestBase {
-  @Test
-  public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE, 
str2DateTime("2017-05-22 09:10:11")));
-    // YEAR
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.YEAR));
-    assertEquals(
-        str2DateTime("2017-01-01 00:00:00"),
-        new BeamSqlDateFloorExpression(operands)
-            .evaluate(row, null, BeamSqlExpressionEnvironments.empty())
-            .getDate());
-    // MONTH
-    operands.set(1, BeamSqlPrimitive.of(SqlTypeName.SYMBOL, 
TimeUnitRange.MONTH));
-    assertEquals(
-        str2DateTime("2017-05-01 00:00:00"),
-        new BeamSqlDateFloorExpression(operands)
-            .evaluate(row, null, BeamSqlExpressionEnvironments.empty())
-            .getDate());
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpressionTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpressionTest.java
deleted file mode 100644
index 905c540134e..00000000000
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpressionTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironments;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.avatica.util.TimeUnitRange;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.joda.time.DateTime;
-import org.junit.Test;
-
-/** Test for {@code BeamSqlExtractExpression}. */
-public class BeamSqlExtractExpressionTest extends 
BeamSqlDateExpressionTestBase {
-  @Test
-  public void evaluate() throws Exception {
-    List<BeamSqlExpression> operands = new ArrayList<>();
-    DateTime time = str2DateTime("2017-05-22 16:17:18");
-
-    // YEAR
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.YEAR));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, time));
-    assertEquals(
-        2017L,
-        new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.row, null, 
BeamSqlExpressionEnvironments.empty())
-            .getValue());
-
-    // MONTH
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.MONTH));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, time));
-    assertEquals(
-        5L,
-        new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.row, null, 
BeamSqlExpressionEnvironments.empty())
-            .getValue());
-
-    // DAY
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.DAY));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, time));
-    assertEquals(
-        22L,
-        new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.row, null, 
BeamSqlExpressionEnvironments.empty())
-            .getValue());
-
-    // DAY_OF_WEEK
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.DOW));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, time));
-    assertEquals(
-        2L,
-        new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.row, null, 
BeamSqlExpressionEnvironments.empty())
-            .getValue());
-
-    // DAY_OF_YEAR
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.DOY));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, time));
-    assertEquals(
-        142L,
-        new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.row, null, 
BeamSqlExpressionEnvironments.empty())
-            .getValue());
-
-    // WEEK
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.WEEK));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, time));
-    assertEquals(
-        21L,
-        new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.row, null, 
BeamSqlExpressionEnvironments.empty())
-            .getValue());
-
-    // QUARTER
-    operands.clear();
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, 
TimeUnitRange.QUARTER));
-    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, time));
-    assertEquals(
-        2L,
-        new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.row, null, 
BeamSqlExpressionEnvironments.empty())
-            .getValue());
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
index 95c8a87928b..a9d50ab7a9d 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java
@@ -33,188 +33,6 @@
 /** Integration test for date functions. */
 public class BeamSqlDateFunctionsIntegrationTest
     extends BeamSqlBuiltinFunctionsIntegrationTestBase {
-  @Test
-  public void testBasicDateTimeFunctions() throws Exception {
-    ExpressionChecker checker =
-        new ExpressionChecker()
-            .addExpr("EXTRACT(YEAR FROM ts)", 1986L)
-            .addExpr("YEAR(ts)", 1986L)
-            .addExpr("QUARTER(ts)", 1L)
-            .addExpr("MONTH(ts)", 2L)
-            .addExpr("WEEK(ts)", 7L)
-            .addExpr("DAYOFMONTH(ts)", 15L)
-            .addExpr("DAYOFYEAR(ts)", 46L)
-            .addExpr("DAYOFWEEK(ts)", 7L)
-            .addExpr("HOUR(ts)", 11L)
-            .addExpr("MINUTE(ts)", 35L)
-            .addExpr("SECOND(ts)", 26L)
-            .addExpr("FLOOR(ts TO YEAR)", parseDate("1986-01-01 00:00:00"))
-            .addExpr("CEIL(ts TO YEAR)", parseDate("1987-01-01 00:00:00"));
-    checker.buildRunAndCheck();
-  }
-
-  @Test
-  public void testDatetimePlusFunction() throws Exception {
-    ExpressionChecker checker =
-        new ExpressionChecker()
-            .addExpr(
-                "TIMESTAMPADD(SECOND, 3, TIMESTAMP '1984-04-19 01:02:03')",
-                parseDate("1984-04-19 01:02:06"))
-            .addExpr(
-                "TIMESTAMPADD(MINUTE, 3, TIMESTAMP '1984-04-19 01:02:03')",
-                parseDate("1984-04-19 01:05:03"))
-            .addExpr(
-                "TIMESTAMPADD(HOUR, 3, TIMESTAMP '1984-04-19 01:02:03')",
-                parseDate("1984-04-19 04:02:03"))
-            .addExpr(
-                "TIMESTAMPADD(DAY, 3, TIMESTAMP '1984-04-19 01:02:03')",
-                parseDate("1984-04-22 01:02:03"))
-            .addExpr(
-                "TIMESTAMPADD(MONTH, 2, TIMESTAMP '1984-01-19 01:02:03')",
-                parseDate("1984-03-19 01:02:03"))
-            .addExpr(
-                "TIMESTAMPADD(YEAR, 2, TIMESTAMP '1985-01-19 01:02:03')",
-                parseDate("1987-01-19 01:02:03"));
-    checker.buildRunAndCheck();
-  }
-
-  @Test
-  public void testDatetimeInfixPlus() throws Exception {
-    ExpressionChecker checker =
-        new ExpressionChecker()
-            .addExpr(
-                "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '3' SECOND",
-                parseDate("1984-01-19 01:02:06"))
-            .addExpr(
-                "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' MINUTE",
-                parseDate("1984-01-19 01:04:03"))
-            .addExpr(
-                "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' HOUR",
-                parseDate("1984-01-19 03:02:03"))
-            .addExpr(
-                "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' DAY",
-                parseDate("1984-01-21 01:02:03"))
-            .addExpr(
-                "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' MONTH",
-                parseDate("1984-03-19 01:02:03"))
-            .addExpr(
-                "TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' YEAR",
-                parseDate("1986-01-19 01:02:03"));
-    checker.buildRunAndCheck();
-  }
-
-  @Test
-  public void testTimestampDiff() throws Exception {
-    ExpressionChecker checker =
-        new ExpressionChecker()
-            .addExpr(
-                "TIMESTAMPDIFF(SECOND, TIMESTAMP '1984-04-19 01:01:58', "
-                    + "TIMESTAMP '1984-04-19 01:01:58')",
-                0)
-            .addExpr(
-                "TIMESTAMPDIFF(SECOND, TIMESTAMP '1984-04-19 01:01:58', "
-                    + "TIMESTAMP '1984-04-19 01:01:59')",
-                1)
-            .addExpr(
-                "TIMESTAMPDIFF(SECOND, TIMESTAMP '1984-04-19 01:01:58', "
-                    + "TIMESTAMP '1984-04-19 01:02:00')",
-                2)
-            .addExpr(
-                "TIMESTAMPDIFF(MINUTE, TIMESTAMP '1984-04-19 01:01:58', "
-                    + "TIMESTAMP '1984-04-19 01:02:57')",
-                0)
-            .addExpr(
-                "TIMESTAMPDIFF(MINUTE, TIMESTAMP '1984-04-19 01:01:58', "
-                    + "TIMESTAMP '1984-04-19 01:02:58')",
-                1)
-            .addExpr(
-                "TIMESTAMPDIFF(MINUTE, TIMESTAMP '1984-04-19 01:01:58', "
-                    + "TIMESTAMP '1984-04-19 01:03:58')",
-                2)
-            .addExpr(
-                "TIMESTAMPDIFF(HOUR, TIMESTAMP '1984-04-19 01:01:58', "
-                    + "TIMESTAMP '1984-04-19 02:01:57')",
-                0)
-            .addExpr(
-                "TIMESTAMPDIFF(HOUR, TIMESTAMP '1984-04-19 01:01:58', "
-                    + "TIMESTAMP '1984-04-19 02:01:58')",
-                1)
-            .addExpr(
-                "TIMESTAMPDIFF(HOUR, TIMESTAMP '1984-04-19 01:01:58', "
-                    + "TIMESTAMP '1984-04-19 03:01:58')",
-                2)
-            .addExpr(
-                "TIMESTAMPDIFF(DAY, TIMESTAMP '1984-04-19 01:01:58', "
-                    + "TIMESTAMP '1984-04-20 01:01:57')",
-                0)
-            .addExpr(
-                "TIMESTAMPDIFF(DAY, TIMESTAMP '1984-04-19 01:01:58', "
-                    + "TIMESTAMP '1984-04-20 01:01:58')",
-                1)
-            .addExpr(
-                "TIMESTAMPDIFF(DAY, TIMESTAMP '1984-04-19 01:01:58', "
-                    + "TIMESTAMP '1984-04-21 01:01:58')",
-                2)
-            .addExpr(
-                "TIMESTAMPDIFF(MONTH, TIMESTAMP '1984-01-19 01:01:58', "
-                    + "TIMESTAMP '1984-02-19 01:01:57')",
-                0)
-            .addExpr(
-                "TIMESTAMPDIFF(MONTH, TIMESTAMP '1984-01-19 01:01:58', "
-                    + "TIMESTAMP '1984-02-19 01:01:58')",
-                1)
-            .addExpr(
-                "TIMESTAMPDIFF(MONTH, TIMESTAMP '1984-01-19 01:01:58', "
-                    + "TIMESTAMP '1984-03-19 01:01:58')",
-                2)
-            .addExpr(
-                "TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
-                    + "TIMESTAMP '1982-01-19 01:01:57')",
-                0)
-            .addExpr(
-                "TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
-                    + "TIMESTAMP '1982-01-19 01:01:58')",
-                1)
-            .addExpr(
-                "TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
-                    + "TIMESTAMP '1983-01-19 01:01:58')",
-                2)
-            .addExpr(
-                "TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
-                    + "TIMESTAMP '1980-01-19 01:01:58')",
-                -1)
-            .addExpr(
-                "TIMESTAMPDIFF(YEAR, TIMESTAMP '1981-01-19 01:01:58', "
-                    + "TIMESTAMP '1979-01-19 01:01:58')",
-                -2);
-    checker.buildRunAndCheck();
-  }
-
-  @Test
-  public void testTimestampMinusInterval() throws Exception {
-    ExpressionChecker checker =
-        new ExpressionChecker()
-            .addExpr(
-                "TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '2' SECOND",
-                parseDate("1984-04-19 01:01:56"))
-            .addExpr(
-                "TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '1' MINUTE",
-                parseDate("1984-04-19 01:00:58"))
-            .addExpr(
-                "TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '4' HOUR",
-                parseDate("1984-04-18 21:01:58"))
-            .addExpr(
-                "TIMESTAMP '1984-04-19 01:01:58' - INTERVAL '5' DAY",
-                parseDate("1984-04-14 01:01:58"))
-            .addExpr(
-                "TIMESTAMP '1984-01-19 01:01:58' - INTERVAL '2' MONTH",
-                parseDate("1983-11-19 01:01:58"))
-            .addExpr(
-                "TIMESTAMP '1984-01-19 01:01:58' - INTERVAL '1' YEAR",
-                parseDate("1983-01-19 01:01:58"));
-    checker.buildRunAndCheck();
-  }
-
   @Test
   public void testDateTimeFunctions_currentTime() throws Exception {
     String sql =


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 115612)
    Time Spent: 4h  (was: 3h 50m)

> SQL operator argument evaluation should have one place where it is managed
> --------------------------------------------------------------------------
>
>                 Key: BEAM-4365
>                 URL: https://issues.apache.org/jira/browse/BEAM-4365
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 4h
>  Remaining Estimate: 0h
>
> The way Beam SQL is factored, each operator has to explicitly ask its 
> argument to be evaluated. This should be handled generically at a higher 
> level. Since the language is pure and terminating, it is fine for them to 
> vary, but given the simplicity of the expression language it makes sense to 
> use simple call-by-value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to