[ 
https://issues.apache.org/jira/browse/BEAM-4723?focusedWorklogId=137625&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137625
 ]

ASF GitHub Bot logged work on BEAM-4723:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Aug/18 23:48
            Start Date: 23/Aug/18 23:48
    Worklog Time Spent: 10m 
      Work Description: apilloud closed pull request #5926: [BEAM-4723] [SQL] 
Support datetime type minus time interval
URL: https://github.com/apache/beam/pull/5926
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
index d7bbf990e6b..08935aa91bb 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpression.java
@@ -18,16 +18,13 @@
 
 package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
 
-import com.google.common.collect.ImmutableMap;
 import java.util.List;
-import java.util.Map;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.joda.time.DurationFieldType;
 
 /**
  * Infix '-' operation for timestamps.
@@ -46,17 +43,6 @@
  * yet.
  */
 public class BeamSqlDatetimeMinusExpression extends BeamSqlExpression {
-
-  static final Map<SqlTypeName, DurationFieldType> INTERVALS_DURATIONS_TYPES =
-      ImmutableMap.<SqlTypeName, DurationFieldType>builder()
-          .put(SqlTypeName.INTERVAL_SECOND, DurationFieldType.seconds())
-          .put(SqlTypeName.INTERVAL_MINUTE, DurationFieldType.minutes())
-          .put(SqlTypeName.INTERVAL_HOUR, DurationFieldType.hours())
-          .put(SqlTypeName.INTERVAL_DAY, DurationFieldType.days())
-          .put(SqlTypeName.INTERVAL_MONTH, DurationFieldType.months())
-          .put(SqlTypeName.INTERVAL_YEAR, DurationFieldType.years())
-          .build();
-
   private BeamSqlExpression delegateExpression;
 
   public BeamSqlDatetimeMinusExpression(List<BeamSqlExpression> operands, 
SqlTypeName outputType) {
@@ -67,11 +53,12 @@ public 
BeamSqlDatetimeMinusExpression(List<BeamSqlExpression> operands, SqlTypeN
 
   private BeamSqlExpression createDelegateExpression(
       List<BeamSqlExpression> operands, SqlTypeName outputType) {
-
     if (isTimestampMinusTimestamp(operands, outputType)) {
       return new BeamSqlTimestampMinusTimestampExpression(operands, 
outputType);
     } else if (isTimestampMinusInterval(operands, outputType)) {
       return new BeamSqlTimestampMinusIntervalExpression(operands, outputType);
+    } else if (isDatetimeMinusInterval(operands, outputType)) {
+      return new BeamSqlDatetimeMinusIntervalExpression(operands, outputType);
     }
 
     return null;
@@ -89,6 +76,12 @@ private boolean isTimestampMinusInterval(
     return BeamSqlTimestampMinusIntervalExpression.accept(operands, 
outputType);
   }
 
+  private boolean isDatetimeMinusInterval(
+      List<BeamSqlExpression> operands, SqlTypeName outputType) {
+
+    return BeamSqlDatetimeMinusIntervalExpression.accept(operands, outputType);
+  }
+
   @Override
   public boolean accept() {
     return delegateExpression != null && delegateExpression.accept();
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusIntervalExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusIntervalExpression.java
new file mode 100644
index 00000000000..91a23928a2e
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusIntervalExpression.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import java.math.BigDecimal;
+import java.util.List;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Period;
+
+/** minus ('-') operator for 'datetime - interval' expressions. */
+public class BeamSqlDatetimeMinusIntervalExpression extends BeamSqlExpression {
+  public BeamSqlDatetimeMinusIntervalExpression(
+      List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  @Override
+  public boolean accept() {
+    return accept(operands, outputType);
+  }
+
+  static boolean accept(List<BeamSqlExpression> operands, SqlTypeName 
outputType) {
+    return operands.size() == 2
+        && SqlTypeName.TIMESTAMP.equals(outputType)
+        && SqlTypeName.DATETIME_TYPES.contains(operands.get(0).getOutputType())
+        && 
TimeUnitUtils.INTERVALS_DURATIONS_TYPES.containsKey(operands.get(1).getOutputType());
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(
+      Row row, BoundedWindow window, BeamSqlExpressionEnvironment env) {
+    DateTime date = new DateTime(opValueEvaluated(0, row, window, env));
+    Period period = intervalToPeriod(op(1).evaluate(row, window, env));
+
+    return BeamSqlPrimitive.of(outputType, date.minus(period));
+  }
+
+  private Period intervalToPeriod(BeamSqlPrimitive operand) {
+    BigDecimal intervalValue = operand.getDecimal();
+    SqlTypeName intervalType = operand.getOutputType();
+
+    int numberOfIntervals =
+        intervalValue
+            .divide(TimeUnitUtils.timeUnitInternalMultiplier(intervalType))
+            .intValueExact();
+
+    return new Period().withField(durationFieldType(intervalType), 
numberOfIntervals);
+  }
+
+  private static DurationFieldType durationFieldType(SqlTypeName 
intervalTypeToCount) {
+    return TimeUnitUtils.INTERVALS_DURATIONS_TYPES.get(intervalTypeToCount);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpression.java
index ced9755a2f1..1e2121dce88 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpression.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpression.java
@@ -21,10 +21,8 @@
 import static 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.TimeUnitUtils.timeUnitInternalMultiplier;
 import static 
org.apache.beam.sdk.extensions.sql.impl.utils.SqlTypeUtils.findExpressionOfType;
 
-import com.google.common.collect.ImmutableSet;
 import java.math.BigDecimal;
 import java.util.List;
-import java.util.Set;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
@@ -40,16 +38,6 @@
  * <p>Input and output are expected to be of type TIMESTAMP.
  */
 public class BeamSqlDatetimePlusExpression extends BeamSqlExpression {
-
-  private static final Set<SqlTypeName> SUPPORTED_INTERVAL_TYPES =
-      ImmutableSet.of(
-          SqlTypeName.INTERVAL_SECOND,
-          SqlTypeName.INTERVAL_MINUTE,
-          SqlTypeName.INTERVAL_HOUR,
-          SqlTypeName.INTERVAL_DAY,
-          SqlTypeName.INTERVAL_MONTH,
-          SqlTypeName.INTERVAL_YEAR);
-
   public BeamSqlDatetimePlusExpression(List<BeamSqlExpression> operands) {
     super(operands, SqlTypeName.TIMESTAMP);
   }
@@ -59,7 +47,7 @@ public BeamSqlDatetimePlusExpression(List<BeamSqlExpression> 
operands) {
   public boolean accept() {
     return operands.size() == 2
         && SqlTypeName.DATETIME_TYPES.contains(operands.get(0).getOutputType())
-        && SUPPORTED_INTERVAL_TYPES.contains(operands.get(1).getOutputType());
+        && 
TimeUnitUtils.INTERVALS_DURATIONS_TYPES.containsKey(operands.get(1).getOutputType());
   }
 
   /**
@@ -95,7 +83,7 @@ private int getIntervalMultiplier(BeamSqlPrimitive 
intervalOperandPrimitive) {
 
   private BeamSqlPrimitive getIntervalOperand(
       Row inputRow, BoundedWindow window, BeamSqlExpressionEnvironment env) {
-    return findExpressionOfType(operands, SUPPORTED_INTERVAL_TYPES)
+    return findExpressionOfType(operands, 
TimeUnitUtils.INTERVALS_DURATIONS_TYPES.keySet())
         .get()
         .evaluate(inputRow, window, env);
   }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpression.java
index aa02fbdce03..5f53c8d28db 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpression.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpression.java
@@ -18,8 +18,6 @@
 
 package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
 
-import static 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression.INTERVALS_DURATIONS_TYPES;
-
 import java.math.BigDecimal;
 import java.util.List;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment;
@@ -38,7 +36,6 @@
  * <p>See {@link BeamSqlDatetimeMinusExpression} for other kinds of datetime 
types subtraction.
  */
 public class BeamSqlTimestampMinusIntervalExpression extends BeamSqlExpression 
{
-
   public BeamSqlTimestampMinusIntervalExpression(
       List<BeamSqlExpression> operands, SqlTypeName outputType) {
     super(operands, outputType);
@@ -53,7 +50,7 @@ static boolean accept(List<BeamSqlExpression> operands, 
SqlTypeName outputType)
     return operands.size() == 2
         && SqlTypeName.TIMESTAMP.equals(outputType)
         && SqlTypeName.TIMESTAMP.equals(operands.get(0).getOutputType())
-        && 
INTERVALS_DURATIONS_TYPES.containsKey(operands.get(1).getOutputType());
+        && 
TimeUnitUtils.INTERVALS_DURATIONS_TYPES.containsKey(operands.get(1).getOutputType());
   }
 
   @Override
@@ -78,6 +75,6 @@ private Period intervalToPeriod(BeamSqlPrimitive operand) {
   }
 
   private static DurationFieldType durationFieldType(SqlTypeName 
intervalTypeToCount) {
-    return INTERVALS_DURATIONS_TYPES.get(intervalTypeToCount);
+    return TimeUnitUtils.INTERVALS_DURATIONS_TYPES.get(intervalTypeToCount);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpression.java
index 14dfb31a7b8..ff2033d85d9 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpression.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusTimestampExpression.java
@@ -18,8 +18,6 @@
 
 package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
 
-import static 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression.INTERVALS_DURATIONS_TYPES;
-
 import java.util.List;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
@@ -57,7 +55,7 @@ public boolean accept() {
   }
 
   static boolean accept(List<BeamSqlExpression> operands, SqlTypeName 
intervalType) {
-    return INTERVALS_DURATIONS_TYPES.containsKey(intervalType)
+    return TimeUnitUtils.INTERVALS_DURATIONS_TYPES.containsKey(intervalType)
         && operands.size() == 2
         && SqlTypeName.TIMESTAMP.equals(operands.get(0).getOutputType())
         && SqlTypeName.TIMESTAMP.equals(operands.get(1).getOutputType());
@@ -90,11 +88,11 @@ private long numberOfIntervalsBetweenDates(DateTime 
timestampStart, DateTime tim
   }
 
   private static DurationFieldType durationFieldType(SqlTypeName 
intervalTypeToCount) {
-    if (!INTERVALS_DURATIONS_TYPES.containsKey(intervalTypeToCount)) {
+    if 
(!TimeUnitUtils.INTERVALS_DURATIONS_TYPES.containsKey(intervalTypeToCount)) {
       throw new IllegalArgumentException(
           "Counting " + intervalTypeToCount.getName() + "s between dates is 
not supported");
     }
 
-    return INTERVALS_DURATIONS_TYPES.get(intervalTypeToCount);
+    return TimeUnitUtils.INTERVALS_DURATIONS_TYPES.get(intervalTypeToCount);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtils.java
index f35d38ffb4e..3174c925f68 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtils.java
@@ -18,12 +18,25 @@
 
 package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
 
+import com.google.common.collect.ImmutableMap;
 import java.math.BigDecimal;
+import java.util.Map;
 import org.apache.calcite.avatica.util.TimeUnit;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DurationFieldType;
 
 /** Utils to convert between Calcite's TimeUnit and Sql intervals. */
 public abstract class TimeUnitUtils {
+  /** supported interval and duration type. */
+  public static final Map<SqlTypeName, DurationFieldType> 
INTERVALS_DURATIONS_TYPES =
+      ImmutableMap.<SqlTypeName, DurationFieldType>builder()
+          .put(SqlTypeName.INTERVAL_SECOND, DurationFieldType.seconds())
+          .put(SqlTypeName.INTERVAL_MINUTE, DurationFieldType.minutes())
+          .put(SqlTypeName.INTERVAL_HOUR, DurationFieldType.hours())
+          .put(SqlTypeName.INTERVAL_DAY, DurationFieldType.days())
+          .put(SqlTypeName.INTERVAL_MONTH, DurationFieldType.months())
+          .put(SqlTypeName.INTERVAL_YEAR, DurationFieldType.years())
+          .build();
 
   /**
    * @return internal multiplier of a TimeUnit, e.g. YEAR is 12, MINUTE is 
60000
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
index 5f5ab6ff748..688663fa960 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusExpressionTest.java
@@ -46,6 +46,8 @@
   private static final BeamSqlPrimitive TIMESTAMP =
       BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, DATE);
 
+  private static final BeamSqlPrimitive DATE1 = 
BeamSqlPrimitive.of(SqlTypeName.DATE, DATE);
+
   private static final BeamSqlPrimitive TIMESTAMP_MINUS_2_SEC =
       BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, DATE_MINUS_2_SEC);
 
@@ -130,6 +132,18 @@ public void testEvaluateTimestampMinusInteval() {
     assertEquals(DATE_MINUS_2_SEC, subtractionResult.getDate());
   }
 
+  @Test
+  public void testEvaluateDateMinusInteval() {
+    BeamSqlDatetimeMinusExpression minusExpression =
+        minusExpression(SqlTypeName.TIMESTAMP, DATE1, INTERVAL_2_SEC);
+
+    BeamSqlPrimitive subtractionResult =
+        minusExpression.evaluate(NULL_ROW, NULL_WINDOW, 
BeamSqlExpressionEnvironments.empty());
+
+    assertEquals(SqlTypeName.TIMESTAMP, subtractionResult.getOutputType());
+    assertEquals(DATE_MINUS_2_SEC, subtractionResult.getDate());
+  }
+
   private static BeamSqlDatetimeMinusExpression minusExpression(
       SqlTypeName outputType, BeamSqlExpression... operands) {
     return new BeamSqlDatetimeMinusExpression(Arrays.asList(operands), 
outputType);
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusIntervalExpressionTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusIntervalExpressionTest.java
new file mode 100644
index 00000000000..05d0ad81ca2
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimeMinusIntervalExpressionTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironments;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.DateTime;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/** Unit tests for {@link BeamSqlDatetimeMinusIntervalExpression}. */
+public class BeamSqlDatetimeMinusIntervalExpressionTest {
+  private static final Row NULL_ROW = null;
+  private static final BoundedWindow NULL_WINDOW = null;
+
+  private static final DateTime DATE = new DateTime(329281L);
+  private static final DateTime DATE_MINUS_2_SEC = DATE.minusSeconds(2);
+
+  private static final BeamSqlPrimitive DATETIME = 
BeamSqlPrimitive.of(SqlTypeName.DATE, DATE);
+
+  private static final BeamSqlPrimitive INTERVAL_2_SEC =
+      BeamSqlPrimitive.of(
+          SqlTypeName.INTERVAL_SECOND, TimeUnit.SECOND.multiplier.multiply(new 
BigDecimal(2)));
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testAcceptsHappyPath() {
+    BeamSqlDatetimeMinusIntervalExpression minusExpression =
+        minusExpression(SqlTypeName.TIMESTAMP, DATETIME, INTERVAL_2_SEC);
+
+    assertTrue(minusExpression.accept());
+  }
+
+  @Test
+  public void testDoesNotAcceptWrongOutputType() {
+    Set<SqlTypeName> unsupportedTypes = new HashSet<>(SqlTypeName.ALL_TYPES);
+    unsupportedTypes.remove(SqlTypeName.TIMESTAMP);
+
+    for (SqlTypeName unsupportedType : unsupportedTypes) {
+      BeamSqlDatetimeMinusIntervalExpression minusExpression =
+          minusExpression(unsupportedType, DATETIME, INTERVAL_2_SEC);
+
+      assertFalse(minusExpression.accept());
+    }
+  }
+
+  @Test
+  public void testDoesNotAcceptWrongFirstOperand() {
+    Set<SqlTypeName> unsupportedTypes = new HashSet<>(SqlTypeName.ALL_TYPES);
+    unsupportedTypes.removeAll(SqlTypeName.DATETIME_TYPES);
+
+    for (SqlTypeName unsupportedType : unsupportedTypes) {
+      BeamSqlPrimitive unsupportedOperand = mock(BeamSqlPrimitive.class);
+      doReturn(unsupportedType).when(unsupportedOperand).getOutputType();
+
+      BeamSqlDatetimeMinusIntervalExpression minusExpression =
+          minusExpression(SqlTypeName.TIMESTAMP, unsupportedOperand, 
INTERVAL_2_SEC);
+
+      assertFalse(minusExpression.accept());
+    }
+  }
+
+  @Test
+  public void testDoesNotAcceptWrongSecondOperand() {
+    Set<SqlTypeName> unsupportedTypes = new HashSet<>(SqlTypeName.ALL_TYPES);
+    
unsupportedTypes.removeAll(TimeUnitUtils.INTERVALS_DURATIONS_TYPES.keySet());
+
+    for (SqlTypeName unsupportedType : unsupportedTypes) {
+      BeamSqlPrimitive unsupportedOperand = mock(BeamSqlPrimitive.class);
+      doReturn(unsupportedType).when(unsupportedOperand).getOutputType();
+
+      BeamSqlDatetimeMinusIntervalExpression minusExpression =
+          minusExpression(SqlTypeName.TIMESTAMP, DATETIME, unsupportedOperand);
+
+      assertFalse(minusExpression.accept());
+    }
+  }
+
+  @Test
+  public void testAcceptsAllSupportedIntervalTypes() {
+    for (SqlTypeName unsupportedType : 
TimeUnitUtils.INTERVALS_DURATIONS_TYPES.keySet()) {
+      BeamSqlPrimitive unsupportedOperand = mock(BeamSqlPrimitive.class);
+      doReturn(unsupportedType).when(unsupportedOperand).getOutputType();
+
+      BeamSqlDatetimeMinusIntervalExpression minusExpression =
+          minusExpression(SqlTypeName.TIMESTAMP, DATETIME, unsupportedOperand);
+
+      assertTrue(minusExpression.accept());
+    }
+  }
+
+  @Test
+  public void testEvaluateHappyPath() {
+    BeamSqlDatetimeMinusIntervalExpression minusExpression =
+        minusExpression(SqlTypeName.TIMESTAMP, DATETIME, INTERVAL_2_SEC);
+
+    BeamSqlPrimitive subtractionResult =
+        minusExpression.evaluate(NULL_ROW, NULL_WINDOW, 
BeamSqlExpressionEnvironments.empty());
+
+    assertEquals(SqlTypeName.TIMESTAMP, subtractionResult.getOutputType());
+    assertEquals(DATE_MINUS_2_SEC, subtractionResult.getDate());
+  }
+
+  private static BeamSqlDatetimeMinusIntervalExpression minusExpression(
+      SqlTypeName intervalsToCount, BeamSqlExpression... operands) {
+    return new BeamSqlDatetimeMinusIntervalExpression(Arrays.asList(operands), 
intervalsToCount);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpressionTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpressionTest.java
index eb31415c817..ef8fe78f23d 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpressionTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlTimestampMinusIntervalExpressionTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date;
 
-import static 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression.INTERVALS_DURATIONS_TYPES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -127,7 +126,7 @@ public void testDoesNotAcceptWrongFirstOperand() {
   @Test
   public void testDoesNotAcceptWrongSecondOperand() {
     Set<SqlTypeName> unsupportedTypes = new HashSet<>(SqlTypeName.ALL_TYPES);
-    unsupportedTypes.removeAll(INTERVALS_DURATIONS_TYPES.keySet());
+    
unsupportedTypes.removeAll(TimeUnitUtils.INTERVALS_DURATIONS_TYPES.keySet());
 
     for (SqlTypeName unsupportedType : unsupportedTypes) {
       BeamSqlPrimitive unsupportedOperand = mock(BeamSqlPrimitive.class);
@@ -142,7 +141,7 @@ public void testDoesNotAcceptWrongSecondOperand() {
 
   @Test
   public void testAcceptsAllSupportedIntervalTypes() {
-    for (SqlTypeName unsupportedType : INTERVALS_DURATIONS_TYPES.keySet()) {
+    for (SqlTypeName unsupportedType : 
TimeUnitUtils.INTERVALS_DURATIONS_TYPES.keySet()) {
       BeamSqlPrimitive unsupportedOperand = mock(BeamSqlPrimitive.class);
       doReturn(unsupportedType).when(unsupportedOperand).getOutputType();
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 137625)
    Time Spent: 7h 20m  (was: 7h 10m)

> Enhance Datetime*Expression Datetime Type
> -----------------------------------------
>
>                 Key: BEAM-4723
>                 URL: https://issues.apache.org/jira/browse/BEAM-4723
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>            Reporter: Kai Jiang
>            Assignee: Kai Jiang
>            Priority: Major
>          Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> Datetime*Expression only supports timestamp type for first operand now. We 
> should let it accept all Datetime_Types



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to