[FLINK-7934] [table] Upgrade to Calcite 1.15

This closes #5355.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4816a6e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4816a6e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4816a6e7

Branch: refs/heads/master
Commit: 4816a6e7739851de7ae3b8f7676b2f3aa6e20e09
Parents: 37b4e2c
Author: Shuyi Chen <[email protected]>
Authored: Tue Jan 9 16:52:56 2018 -0800
Committer: twalthr <[email protected]>
Committed: Fri Jan 26 13:20:48 2018 +0100

----------------------------------------------------------------------
 flink-libraries/flink-table/pom.xml             |    2 +-
 .../org/apache/calcite/rex/RexSimplify.java     | 1275 ++++++++++++++++++
 .../calcite/sql/SqlGroupedWindowFunction.java   |  136 ++
 .../calcite/sql/fun/SqlGroupFunction.java       |  148 --
 .../flink/table/calcite/FlinkTypeFactory.scala  |    2 +-
 .../table/codegen/calls/ExtractCallGen.scala    |  110 ++
 .../table/codegen/calls/FunctionGenerator.scala |   37 +-
 .../flink/table/expressions/aggregations.scala  |    2 +-
 .../apache/flink/table/expressions/time.scala   |   57 +-
 .../flink/table/validate/FunctionCatalog.scala  |   60 +-
 .../table/api/batch/sql/GroupWindowTest.scala   |    2 +-
 .../flink/table/api/batch/table/CalcTest.scala  |   16 +-
 .../table/api/stream/sql/GroupWindowTest.scala  |    2 +-
 13 files changed, 1609 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4816a6e7/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml 
b/flink-libraries/flink-table/pom.xml
index 374ea06..4bc5072 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -67,7 +67,7 @@ under the License.
                <dependency>
                        <groupId>org.apache.calcite</groupId>
                        <artifactId>calcite-core</artifactId>
-                       <version>1.14.0</version>
+                       <version>1.15.0</version>
                        <exclusions>
                                <exclusion>
                                        
<groupId>org.apache.calcite.avatica</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/4816a6e7/flink-libraries/flink-table/src/main/java/org/apache/calcite/rex/RexSimplify.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/calcite/rex/RexSimplify.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/calcite/rex/RexSimplify.java
new file mode 100644
index 0000000..6a305c7
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/calcite/rex/RexSimplify.java
@@ -0,0 +1,1275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rex;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptPredicateList;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.Strong;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Range;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/*
+ * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT UNTIL 
CALCITE-2110 IS FIXED.
+ */
+
+/**
+ * Context required to simplify a row-expression.
+ */
+public class RexSimplify {
+       public final RexBuilder rexBuilder;
+       private final RelOptPredicateList predicates;
+       final boolean unknownAsFalse;
+       private final RexExecutor executor;
+
+       /**
+        * Creates a RexSimplify.
+        *
+        * @param rexBuilder Rex builder
+        * @param predicates Predicates known to hold on input fields
+        * @param unknownAsFalse Whether to convert UNKNOWN values to FALSE
+        * @param executor Executor for constant reduction, not null
+        */
+       public RexSimplify(RexBuilder rexBuilder, RelOptPredicateList 
predicates,
+                          boolean unknownAsFalse, RexExecutor executor) {
+               this.rexBuilder = Preconditions.checkNotNull(rexBuilder);
+               this.predicates = Preconditions.checkNotNull(predicates);
+               this.unknownAsFalse = unknownAsFalse;
+               this.executor = Preconditions.checkNotNull(executor);
+       }
+
+       @Deprecated // to be removed before 2.0
+       public RexSimplify(RexBuilder rexBuilder, boolean unknownAsFalse,
+                          RexExecutor executor) {
+               this(rexBuilder, RelOptPredicateList.EMPTY, unknownAsFalse, 
executor);
+       }
+
+       //~ Methods 
----------------------------------------------------------------
+
+       /** Returns a RexSimplify the same as this but with a specified
+        * {@link #unknownAsFalse} value. */
+       public RexSimplify withUnknownAsFalse(boolean unknownAsFalse) {
+               return unknownAsFalse == this.unknownAsFalse
+                               ? this
+                               : new RexSimplify(rexBuilder, predicates, 
unknownAsFalse, executor);
+       }
+
+       /** Returns a RexSimplify the same as this but with a specified
+        * {@link #predicates} value. */
+       public RexSimplify withPredicates(RelOptPredicateList predicates) {
+               return predicates == this.predicates
+                               ? this
+                               : new RexSimplify(rexBuilder, predicates, 
unknownAsFalse, executor);
+       }
+
+       /** Simplifies a boolean expression, always preserving its type and its
+        * nullability.
+        *
+        * <p>This is useful if you are simplifying expressions in a
+        * {@link Project}. */
+       public RexNode simplifyPreservingType(RexNode e) {
+               final RexNode e2 = simplify(e);
+               if (e2.getType() == e.getType()) {
+                       return e2;
+               }
+               final RexNode e3 = rexBuilder.makeCast(e.getType(), e2, true);
+               if (e3.equals(e)) {
+                       return e;
+               }
+               return e3;
+       }
+
+       /**
+        * Simplifies a boolean expression.
+        *
+        * <p>In particular:</p>
+        * <ul>
+        * <li>{@code simplify(x = 1 AND y = 2 AND NOT x = 1)}
+        * returns {@code y = 2}</li>
+        * <li>{@code simplify(x = 1 AND FALSE)}
+        * returns {@code FALSE}</li>
+        * </ul>
+        *
+        * <p>If the expression is a predicate in a WHERE clause, UNKNOWN 
values have
+        * the same effect as FALSE. In situations like this, specify
+        * {@code unknownAsFalse = true}, so and we can switch from 3-valued 
logic to
+        * simpler 2-valued logic and make more optimizations.
+        *
+        * @param e Expression to simplify
+        */
+       public RexNode simplify(RexNode e) {
+               switch (e.getKind()) {
+                       case AND:
+                               return simplifyAnd((RexCall) e);
+                       case OR:
+                               return simplifyOr((RexCall) e);
+                       case NOT:
+                               return simplifyNot((RexCall) e);
+                       case CASE:
+                               return simplifyCase((RexCall) e);
+                       case CAST:
+                               return simplifyCast((RexCall) e);
+                       case IS_NULL:
+                       case IS_NOT_NULL:
+                       case IS_TRUE:
+                       case IS_NOT_TRUE:
+                       case IS_FALSE:
+                       case IS_NOT_FALSE:
+                               assert e instanceof RexCall;
+                               return simplifyIs((RexCall) e);
+                       case EQUALS:
+                       case GREATER_THAN:
+                       case GREATER_THAN_OR_EQUAL:
+                       case LESS_THAN:
+                       case LESS_THAN_OR_EQUAL:
+                       case NOT_EQUALS:
+                               return simplifyComparison((RexCall) e);
+                       default:
+                               return e;
+               }
+       }
+
+       // e must be a comparison (=, >, >=, <, <=, !=)
+       private RexNode simplifyComparison(RexCall e) {
+               //noinspection unchecked
+               return simplifyComparison(e, Comparable.class);
+       }
+
+       // e must be a comparison (=, >, >=, <, <=, !=)
+       private <C extends Comparable<C>> RexNode simplifyComparison(RexCall e,
+                                                                    Class<C> 
clazz) {
+               final List<RexNode> operands = new ArrayList<>(e.operands);
+               simplifyList(operands);
+
+               // Simplify "x <op> x"
+               final RexNode o0 = operands.get(0);
+               final RexNode o1 = operands.get(1);
+               if (RexUtil.eq(o0, o1)
+                               && (unknownAsFalse
+                               || (!o0.getType().isNullable()
+                               && !o1.getType().isNullable()))) {
+                       switch (e.getKind()) {
+                               case EQUALS:
+                               case GREATER_THAN_OR_EQUAL:
+                               case LESS_THAN_OR_EQUAL:
+                                       // "x = x" simplifies to "x is not 
null" (similarly <= and >=)
+                                       return simplify(
+                                                       
rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_NULL, o0));
+                               default:
+                                       // "x != x" simplifies to "false" 
(similarly < and >)
+                                       return rexBuilder.makeLiteral(false);
+                       }
+               }
+
+               // Simplify "<literal1> <op> <literal2>"
+               // For example, "1 = 2" becomes FALSE;
+               // "1 != 1" becomes FALSE;
+               // "1 != NULL" becomes UNKNOWN (or FALSE if unknownAsFalse);
+               // "1 != '1'" is unchanged because the types are not the same.
+               if (o0.isA(SqlKind.LITERAL)
+                               && o1.isA(SqlKind.LITERAL)
+                               && o0.getType().equals(o1.getType())) {
+                       final C v0 = ((RexLiteral) o0).getValueAs(clazz);
+                       final C v1 = ((RexLiteral) o1).getValueAs(clazz);
+                       if (v0 == null || v1 == null) {
+                               return unknownAsFalse
+                                               ? rexBuilder.makeLiteral(false)
+                                               : 
rexBuilder.makeNullLiteral(e.getType());
+                       }
+                       final int comparisonResult = v0.compareTo(v1);
+                       switch (e.getKind()) {
+                               case EQUALS:
+                                       return 
rexBuilder.makeLiteral(comparisonResult == 0);
+                               case GREATER_THAN:
+                                       return 
rexBuilder.makeLiteral(comparisonResult > 0);
+                               case GREATER_THAN_OR_EQUAL:
+                                       return 
rexBuilder.makeLiteral(comparisonResult >= 0);
+                               case LESS_THAN:
+                                       return 
rexBuilder.makeLiteral(comparisonResult < 0);
+                               case LESS_THAN_OR_EQUAL:
+                                       return 
rexBuilder.makeLiteral(comparisonResult <= 0);
+                               case NOT_EQUALS:
+                                       return 
rexBuilder.makeLiteral(comparisonResult != 0);
+                               default:
+                                       throw new AssertionError();
+                       }
+               }
+
+               // If none of the arguments were simplified, return the call 
unchanged.
+               final RexNode e2;
+               if (operands.equals(e.operands)) {
+                       e2 = e;
+               } else {
+                       e2 = rexBuilder.makeCall(e.op, operands);
+               }
+               return simplifyUsingPredicates(e2, clazz);
+       }
+
+       /**
+        * Simplifies a conjunction of boolean expressions.
+        */
+       public RexNode simplifyAnds(Iterable<? extends RexNode> nodes) {
+               final List<RexNode> terms = new ArrayList<>();
+               final List<RexNode> notTerms = new ArrayList<>();
+               for (RexNode e : nodes) {
+                       RelOptUtil.decomposeConjunction(e, terms, notTerms);
+               }
+               simplifyList(terms);
+               simplifyList(notTerms);
+               if (unknownAsFalse) {
+                       return simplifyAnd2ForUnknownAsFalse(terms, notTerms);
+               }
+               return simplifyAnd2(terms, notTerms);
+       }
+
+       private void simplifyList(List<RexNode> terms) {
+               for (int i = 0; i < terms.size(); i++) {
+                       terms.set(i, 
withUnknownAsFalse(false).simplify(terms.get(i)));
+               }
+       }
+
+       private RexNode simplifyNot(RexCall call) {
+               final RexNode a = call.getOperands().get(0);
+               switch (a.getKind()) {
+                       case NOT:
+                               // NOT NOT x ==> x
+                               return simplify(((RexCall) 
a).getOperands().get(0));
+               }
+               final SqlKind negateKind = a.getKind().negate();
+               if (a.getKind() != negateKind) {
+                       return simplify(
+                                       
rexBuilder.makeCall(RexUtil.op(negateKind),
+                                                       
ImmutableList.of(((RexCall) a).getOperands().get(0))));
+               }
+               final SqlKind negateKind2 = a.getKind().negateNullSafe();
+               if (a.getKind() != negateKind2) {
+                       return simplify(
+                                       
rexBuilder.makeCall(RexUtil.op(negateKind2),
+                                                       ((RexCall) 
a).getOperands()));
+               }
+               if (a.getKind() == SqlKind.AND) {
+                       // NOT distributivity for AND
+                       final List<RexNode> newOperands = new ArrayList<>();
+                       for (RexNode operand : ((RexCall) a).getOperands()) {
+                               newOperands.add(
+                                               
simplify(rexBuilder.makeCall(SqlStdOperatorTable.NOT, operand)));
+                       }
+                       return 
simplify(rexBuilder.makeCall(SqlStdOperatorTable.OR, newOperands));
+               }
+               if (a.getKind() == SqlKind.OR) {
+                       // NOT distributivity for OR
+                       final List<RexNode> newOperands = new ArrayList<>();
+                       for (RexNode operand : ((RexCall) a).getOperands()) {
+                               newOperands.add(
+                                               
simplify(rexBuilder.makeCall(SqlStdOperatorTable.NOT, operand)));
+                       }
+                       return simplify(
+                                       
rexBuilder.makeCall(SqlStdOperatorTable.AND, newOperands));
+               }
+               return call;
+       }
+
+       private RexNode simplifyIs(RexCall call) {
+               final SqlKind kind = call.getKind();
+               final RexNode a = call.getOperands().get(0);
+               final RexNode simplified = simplifyIs2(kind, a);
+               if (simplified != null) {
+                       return simplified;
+               }
+               return call;
+       }
+
+       private RexNode simplifyIs2(SqlKind kind, RexNode a) {
+               switch (kind) {
+                       case IS_NULL:
+                               // x IS NULL ==> FALSE (if x is not nullable)
+                               if (!a.getType().isNullable()) {
+                                       return rexBuilder.makeLiteral(false);
+                               }
+                               break;
+                       case IS_NOT_NULL:
+                               // x IS NOT NULL ==> TRUE (if x is not nullable)
+                               RexNode simplified = simplifyIsNotNull(a);
+                               if (simplified != null) {
+                                       return simplified;
+                               }
+                               break;
+                       case IS_TRUE:
+                       case IS_NOT_FALSE:
+                               // x IS TRUE ==> x (if x is not nullable)
+                               // x IS NOT FALSE ==> x (if x is not nullable)
+                               if (!a.getType().isNullable()) {
+                                       return simplify(a);
+                               }
+                               break;
+                       case IS_FALSE:
+                       case IS_NOT_TRUE:
+                               // x IS NOT TRUE ==> NOT x (if x is not 
nullable)
+                               // x IS FALSE ==> NOT x (if x is not nullable)
+                               if (!a.getType().isNullable()) {
+                                       return 
simplify(rexBuilder.makeCall(SqlStdOperatorTable.NOT, a));
+                               }
+                               break;
+               }
+               switch (a.getKind()) {
+                       case NOT:
+                               // (NOT x) IS TRUE ==> x IS FALSE
+                               // Similarly for IS NOT TRUE, IS FALSE, etc.
+                               //
+                               // Note that
+                               //   (NOT x) IS TRUE !=> x IS FALSE
+                               // because of null values.
+                               final SqlOperator notKind = 
RexUtil.op(kind.negateNullSafe());
+                               final RexNode arg = ((RexCall) 
a).operands.get(0);
+                               return simplify(rexBuilder.makeCall(notKind, 
arg));
+               }
+               RexNode a2 = simplify(a);
+               if (a != a2) {
+                       return rexBuilder.makeCall(RexUtil.op(kind), 
ImmutableList.of(a2));
+               }
+               return null; // cannot be simplified
+       }
+
+       private RexNode simplifyIsNotNull(RexNode a) {
+               if (!a.getType().isNullable()) {
+                       return rexBuilder.makeLiteral(true);
+               }
+               switch (Strong.policy(a.getKind())) {
+                       case ANY:
+                               final List<RexNode> operands = new 
ArrayList<>();
+                               for (RexNode operand : ((RexCall) 
a).getOperands()) {
+                                       final RexNode simplified = 
simplifyIsNotNull(operand);
+                                       if (simplified == null) {
+                                               operands.add(
+                                                               
rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_NULL, operand));
+                                       } else if (simplified.isAlwaysFalse()) {
+                                               return 
rexBuilder.makeLiteral(false);
+                                       } else {
+                                               operands.add(simplified);
+                                       }
+                               }
+                               return RexUtil.composeConjunction(rexBuilder, 
operands, false);
+                       case CUSTOM:
+                               switch (a.getKind()) {
+                                       case LITERAL:
+                                               return 
rexBuilder.makeLiteral(!((RexLiteral) a).isNull());
+                                       default:
+                                               throw new AssertionError("every 
CUSTOM policy needs a handler, "
+                                                               + a.getKind());
+                               }
+                       case AS_IS:
+                       default:
+                               return null;
+               }
+       }
+
+       private RexNode simplifyCase(RexCall call) {
+               final List<RexNode> operands = call.getOperands();
+               final List<RexNode> newOperands = new ArrayList<>();
+               final Set<String> values = new HashSet<>();
+               for (int i = 0; i < operands.size(); i++) {
+                       RexNode operand = operands.get(i);
+                       if (RexUtil.isCasePredicate(call, i)) {
+                               if (operand.isAlwaysTrue()) {
+                                       // Predicate is always TRUE. Make value 
the ELSE and quit.
+                                       newOperands.add(operands.get(++i));
+                                       if (unknownAsFalse && 
RexUtil.isNull(operands.get(i))) {
+                                               
values.add(rexBuilder.makeLiteral(false).toString());
+                                       } else {
+                                               
values.add(operands.get(i).toString());
+                                       }
+                                       break;
+                               } else if (operand.isAlwaysFalse() || 
RexUtil.isNull(operand)) {
+                                       // Predicate is always FALSE or NULL. 
Skip predicate and value.
+                                       ++i;
+                                       continue;
+                               }
+                       } else {
+                               if (unknownAsFalse && RexUtil.isNull(operand)) {
+                                       
values.add(rexBuilder.makeLiteral(false).toString());
+                               } else {
+                                       values.add(operand.toString());
+                               }
+                       }
+                       newOperands.add(operand);
+               }
+               assert newOperands.size() % 2 == 1;
+               if (newOperands.size() == 1 || values.size() == 1) {
+                       final RexNode last = Util.last(newOperands);
+                       if (!call.getType().equals(last.getType())) {
+                               return 
rexBuilder.makeAbstractCast(call.getType(), last);
+                       }
+                       return last;
+               }
+               trueFalse:
+               if (call.getType().getSqlTypeName() == SqlTypeName.BOOLEAN) {
+                       // Optimize CASE where every branch returns constant 
true or constant
+                       // false.
+                       final List<Pair<RexNode, RexNode>> pairs =
+                                       casePairs(rexBuilder, newOperands);
+                       // 1) Possible simplification if unknown is treated as 
false:
+                       //   CASE
+                       //   WHEN p1 THEN TRUE
+                       //   WHEN p2 THEN TRUE
+                       //   ELSE FALSE
+                       //   END
+                       // can be rewritten to: (p1 or p2)
+                       if (unknownAsFalse) {
+                               final List<RexNode> terms = new ArrayList<>();
+                               int pos = 0;
+                               for (; pos < pairs.size(); pos++) {
+                                       // True block
+                                       Pair<RexNode, RexNode> pair = 
pairs.get(pos);
+                                       if (!pair.getValue().isAlwaysTrue()) {
+                                               break;
+                                       }
+                                       terms.add(pair.getKey());
+                               }
+                               for (; pos < pairs.size(); pos++) {
+                                       // False block
+                                       Pair<RexNode, RexNode> pair = 
pairs.get(pos);
+                                       if (!pair.getValue().isAlwaysFalse()
+                                                       && 
!RexUtil.isNull(pair.getValue())) {
+                                               break;
+                                       }
+                               }
+                               if (pos == pairs.size()) {
+                                       final RexNode disjunction =
+                                                       
RexUtil.composeDisjunction(rexBuilder, terms);
+                                       if 
(!call.getType().equals(disjunction.getType())) {
+                                               return 
rexBuilder.makeCast(call.getType(), disjunction);
+                                       }
+                                       return disjunction;
+                               }
+                       }
+                       // 2) Another simplification
+                       //   CASE
+                       //   WHEN p1 THEN TRUE
+                       //   WHEN p2 THEN FALSE
+                       //   WHEN p3 THEN TRUE
+                       //   ELSE FALSE
+                       //   END
+                       // if p1...pn cannot be nullable
+                       for (Ord<Pair<RexNode, RexNode>> pair : Ord.zip(pairs)) 
{
+                               if (pair.e.getKey().getType().isNullable()) {
+                                       break trueFalse;
+                               }
+                               if (!pair.e.getValue().isAlwaysTrue()
+                                               && 
!pair.e.getValue().isAlwaysFalse()
+                                               && (!unknownAsFalse || 
!RexUtil.isNull(pair.e.getValue()))) {
+                                       break trueFalse;
+                               }
+                       }
+                       final List<RexNode> terms = new ArrayList<>();
+                       final List<RexNode> notTerms = new ArrayList<>();
+                       for (Ord<Pair<RexNode, RexNode>> pair : Ord.zip(pairs)) 
{
+                               if (pair.e.getValue().isAlwaysTrue()) {
+                                       terms.add(RexUtil.andNot(rexBuilder, 
pair.e.getKey(), notTerms));
+                               } else {
+                                       notTerms.add(pair.e.getKey());
+                               }
+                       }
+                       final RexNode disjunction = 
RexUtil.composeDisjunction(rexBuilder, terms);
+                       if (!call.getType().equals(disjunction.getType())) {
+                               return rexBuilder.makeCast(call.getType(), 
disjunction);
+                       }
+                       return disjunction;
+               }
+               if (newOperands.equals(operands)) {
+                       return call;
+               }
+               return call.clone(call.getType(), newOperands);
+       }
+
+       /** Given "CASE WHEN p1 THEN v1 ... ELSE e END"
+        * returns [(p1, v1), ..., (true, e)]. */
+       private static List<Pair<RexNode, RexNode>> casePairs(RexBuilder 
rexBuilder,
+                                                             List<RexNode> 
operands) {
+               final ImmutableList.Builder<Pair<RexNode, RexNode>> builder =
+                               ImmutableList.builder();
+               for (int i = 0; i < operands.size() - 1; i += 2) {
+                       builder.add(Pair.of(operands.get(i), operands.get(i + 
1)));
+               }
+               builder.add(
+                               Pair.of((RexNode) rexBuilder.makeLiteral(true), 
Util.last(operands)));
+               return builder.build();
+       }
+
+       public RexNode simplifyAnd(RexCall e) {
+               final List<RexNode> terms = new ArrayList<>();
+               final List<RexNode> notTerms = new ArrayList<>();
+               RelOptUtil.decomposeConjunction(e, terms, notTerms);
+               simplifyList(terms);
+               simplifyList(notTerms);
+               if (unknownAsFalse) {
+                       return simplifyAnd2ForUnknownAsFalse(terms, notTerms);
+               }
+               return simplifyAnd2(terms, notTerms);
+       }
+
+       RexNode simplifyAnd2(List<RexNode> terms, List<RexNode> notTerms) {
+               for (RexNode term : terms) {
+                       if (term.isAlwaysFalse()) {
+                               return rexBuilder.makeLiteral(false);
+                       }
+               }
+               if (terms.isEmpty() && notTerms.isEmpty()) {
+                       return rexBuilder.makeLiteral(true);
+               }
+               // If one of the not-disjunctions is a disjunction that is 
wholly
+               // contained in the disjunctions list, the expression is not
+               // satisfiable.
+               //
+               // Example #1. x AND y AND z AND NOT (x AND y)  - not 
satisfiable
+               // Example #2. x AND y AND NOT (x AND y)        - not 
satisfiable
+               // Example #3. x AND y AND NOT (x AND y AND z)  - may be 
satisfiable
+               for (RexNode notDisjunction : notTerms) {
+                       final List<RexNode> terms2 = 
RelOptUtil.conjunctions(notDisjunction);
+                       if (terms.containsAll(terms2)) {
+                               return rexBuilder.makeLiteral(false);
+                       }
+               }
+               // Add the NOT disjunctions back in.
+               for (RexNode notDisjunction : notTerms) {
+                       terms.add(
+                                       simplify(
+                                                       
rexBuilder.makeCall(SqlStdOperatorTable.NOT, notDisjunction)));
+               }
+               return RexUtil.composeConjunction(rexBuilder, terms, false);
+       }
+
+       /** As {@link #simplifyAnd2(List, List)} but we assume that if the 
expression
+        * returns UNKNOWN it will be interpreted as FALSE. */
+       RexNode simplifyAnd2ForUnknownAsFalse(List<RexNode> terms,
+                                             List<RexNode> notTerms) {
+               //noinspection unchecked
+               return simplifyAnd2ForUnknownAsFalse(terms, notTerms, 
Comparable.class);
+       }
+
+       private <C extends Comparable<C>> RexNode simplifyAnd2ForUnknownAsFalse(
+                       List<RexNode> terms, List<RexNode> notTerms, Class<C> 
clazz) {
+               for (RexNode term : terms) {
+                       if (term.isAlwaysFalse()) {
+                               return rexBuilder.makeLiteral(false);
+                       }
+               }
+               if (terms.isEmpty() && notTerms.isEmpty()) {
+                       return rexBuilder.makeLiteral(true);
+               }
+               if (terms.size() == 1 && notTerms.isEmpty()) {
+                       // Make sure "x OR y OR x" (a single-term conjunction) 
gets simplified.
+                       return simplify(terms.get(0));
+               }
+               // Try to simplify the expression
+               final Multimap<String, Pair<String, RexNode>> equalityTerms = 
ArrayListMultimap.create();
+               final Map<String, Pair<Range<C>, List<RexNode>>> rangeTerms =
+                               new HashMap<>();
+               final Map<String, String> equalityConstantTerms = new 
HashMap<>();
+               final Set<String> negatedTerms = new HashSet<>();
+               final Set<String> nullOperands = new HashSet<>();
+               final Set<RexNode> notNullOperands = new LinkedHashSet<>();
+               final Set<String> comparedOperands = new HashSet<>();
+
+               // Add the predicates from the source to the range terms.
+               for (RexNode predicate : predicates.pulledUpPredicates) {
+                       final Comparison comparison = Comparison.of(predicate);
+                       if (comparison != null
+                                       && comparison.kind != 
SqlKind.NOT_EQUALS) { // not supported yet
+                               final C v0 = 
comparison.literal.getValueAs(clazz);
+                               if (v0 != null) {
+                                       final RexNode result = 
processRange(rexBuilder, terms, rangeTerms,
+                                                       predicate, 
comparison.ref, v0, comparison.kind);
+                                       if (result != null) {
+                                               // Not satisfiable
+                                               return result;
+                                       }
+                               }
+                       }
+               }
+
+               for (int i = 0; i < terms.size(); i++) {
+                       RexNode term = terms.get(i);
+                       if (!RexUtil.isDeterministic(term)) {
+                               continue;
+                       }
+                       // Simplify BOOLEAN expressions if possible
+                       while (term.getKind() == SqlKind.EQUALS) {
+                               RexCall call = (RexCall) term;
+                               if (call.getOperands().get(0).isAlwaysTrue()) {
+                                       term = call.getOperands().get(1);
+                                       terms.set(i, term);
+                                       continue;
+                               } else if 
(call.getOperands().get(1).isAlwaysTrue()) {
+                                       term = call.getOperands().get(0);
+                                       terms.set(i, term);
+                                       continue;
+                               }
+                               break;
+                       }
+                       switch (term.getKind()) {
+                               case EQUALS:
+                               case NOT_EQUALS:
+                               case LESS_THAN:
+                               case GREATER_THAN:
+                               case LESS_THAN_OR_EQUAL:
+                               case GREATER_THAN_OR_EQUAL:
+                                       RexCall call = (RexCall) term;
+                                       RexNode left = 
call.getOperands().get(0);
+                                       comparedOperands.add(left.toString());
+                                       // if it is a cast, we include the 
inner reference
+                                       if (left.getKind() == SqlKind.CAST) {
+                                               RexCall leftCast = (RexCall) 
left;
+                                               
comparedOperands.add(leftCast.getOperands().get(0).toString());
+                                       }
+                                       RexNode right = 
call.getOperands().get(1);
+                                       comparedOperands.add(right.toString());
+                                       // if it is a cast, we include the 
inner reference
+                                       if (right.getKind() == SqlKind.CAST) {
+                                               RexCall rightCast = (RexCall) 
right;
+                                               
comparedOperands.add(rightCast.getOperands().get(0).toString());
+                                       }
+                                       final Comparison comparison = 
Comparison.of(term);
+                                       // Check for comparison with null values
+                                       if (comparison != null
+                                                       && 
comparison.literal.getValue() == null) {
+                                               return 
rexBuilder.makeLiteral(false);
+                                       }
+                                       // Check for equality on different 
constants. If the same ref or CAST(ref)
+                                       // is equal to different constants, 
this condition cannot be satisfied,
+                                       // and hence it can be evaluated to 
FALSE
+                                       if (term.getKind() == SqlKind.EQUALS) {
+                                               if (comparison != null) {
+                                                       final String literal = 
comparison.literal.toString();
+                                                       final String 
prevLiteral =
+                                                                       
equalityConstantTerms.put(comparison.ref.toString(), literal);
+                                                       if (prevLiteral != null 
&& !literal.equals(prevLiteral)) {
+                                                               return 
rexBuilder.makeLiteral(false);
+                                                       }
+                                               } else if 
(RexUtil.isReferenceOrAccess(left, true)
+                                                               && 
RexUtil.isReferenceOrAccess(right, true)) {
+                                                       
equalityTerms.put(left.toString(), Pair.of(right.toString(), term));
+                                               }
+                                       }
+                                       // Assume the expression a > 5 is part 
of a Filter condition.
+                                       // Then we can derive the negated term: 
a <= 5.
+                                       // But as the comparison is string 
based and thus operands order dependent,
+                                       // we should also add the inverted 
negated term: 5 >= a.
+                                       // Observe that for creating the 
inverted term we invert the list of operands.
+                                       RexNode negatedTerm = 
RexUtil.negate(rexBuilder, call);
+                                       if (negatedTerm != null) {
+                                               
negatedTerms.add(negatedTerm.toString());
+                                               RexNode invertNegatedTerm = 
RexUtil.invert(rexBuilder, (RexCall) negatedTerm);
+                                               if (invertNegatedTerm != null) {
+                                                       
negatedTerms.add(invertNegatedTerm.toString());
+                                               }
+                                       }
+                                       // Remove terms that are implied by 
predicates on the input,
+                                       // or weaken terms that are partially 
implied.
+                                       // E.g. given predicate "x >= 5" and 
term "x between 3 and 10"
+                                       // we weaken to term to "x between 5 
and 10".
+                                       final RexNode term2 = 
simplifyUsingPredicates(term, clazz);
+                                       if (term2 != term) {
+                                               terms.set(i, term = term2);
+                                       }
+                                       // Range
+                                       if (comparison != null
+                                                       && comparison.kind != 
SqlKind.NOT_EQUALS) { // not supported yet
+                                               final C constant = 
comparison.literal.getValueAs(clazz);
+                                               final RexNode result = 
processRange(rexBuilder, terms, rangeTerms,
+                                                               term, 
comparison.ref, constant, comparison.kind);
+                                               if (result != null) {
+                                                       // Not satisfiable
+                                                       return result;
+                                               }
+                                       }
+                                       break;
+                               case IN:
+                                       comparedOperands.add(((RexCall) 
term).operands.get(0).toString());
+                                       break;
+                               case BETWEEN:
+                                       comparedOperands.add(((RexCall) 
term).operands.get(1).toString());
+                                       break;
+                               case IS_NOT_NULL:
+                                       notNullOperands.add(((RexCall) 
term).getOperands().get(0));
+                                       terms.remove(i);
+                                       --i;
+                                       break;
+                               case IS_NULL:
+                                       nullOperands.add(((RexCall) 
term).getOperands().get(0).toString());
+                       }
+               }
+               // If one column should be null and is in a comparison 
predicate,
+               // it is not satisfiable.
+               // Example. IS NULL(x) AND x < 5  - not satisfiable
+               if (!Collections.disjoint(nullOperands, comparedOperands)) {
+                       return rexBuilder.makeLiteral(false);
+               }
+               // Check for equality of two refs wrt equality with constants
+               // Example #1. x=5 AND y=5 AND x=y : x=5 AND y=5
+               // Example #2. x=5 AND y=6 AND x=y - not satisfiable
+               for (String ref1 : equalityTerms.keySet()) {
+                       final String literal1 = equalityConstantTerms.get(ref1);
+                       if (literal1 == null) {
+                               continue;
+                       }
+                       Collection<Pair<String, RexNode>> references = 
equalityTerms.get(ref1);
+                       for (Pair<String, RexNode> ref2 : references) {
+                               final String literal2 = 
equalityConstantTerms.get(ref2.left);
+                               if (literal2 == null) {
+                                       continue;
+                               }
+                               if (!literal1.equals(literal2)) {
+                                       // If an expression is equal to two 
different constants,
+                                       // it is not satisfiable
+                                       return rexBuilder.makeLiteral(false);
+                               }
+                               // Otherwise we can remove the term, as we 
already know that
+                               // the expression is equal to two constants
+                               terms.remove(ref2.right);
+                       }
+               }
+               // Remove not necessary IS NOT NULL expressions.
+               //
+               // Example. IS NOT NULL(x) AND x < 5  : x < 5
+               for (RexNode operand : notNullOperands) {
+                       if (!comparedOperands.contains(operand.toString())) {
+                               terms.add(
+                                               
rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_NULL, operand));
+                       }
+               }
+               // If one of the not-disjunctions is a disjunction that is 
wholly
+               // contained in the disjunctions list, the expression is not
+               // satisfiable.
+               //
+               // Example #1. x AND y AND z AND NOT (x AND y)  - not 
satisfiable
+               // Example #2. x AND y AND NOT (x AND y)        - not 
satisfiable
+               // Example #3. x AND y AND NOT (x AND y AND z)  - may be 
satisfiable
+               final Set<String> termsSet = new 
HashSet<>(RexUtil.strings(terms));
+               for (RexNode notDisjunction : notTerms) {
+                       if (!RexUtil.isDeterministic(notDisjunction)) {
+                               continue;
+                       }
+                       final List<String> terms2Set = 
RexUtil.strings(RelOptUtil.conjunctions(notDisjunction));
+                       if (termsSet.containsAll(terms2Set)) {
+                               return rexBuilder.makeLiteral(false);
+                       }
+               }
+               // Add the NOT disjunctions back in.
+               for (RexNode notDisjunction : notTerms) {
+                       final RexNode call =
+                                       
rexBuilder.makeCall(SqlStdOperatorTable.NOT, notDisjunction);
+                       terms.add(simplify(call));
+               }
+               // The negated terms: only deterministic expressions
+               for (String negatedTerm : negatedTerms) {
+                       if (termsSet.contains(negatedTerm)) {
+                               return rexBuilder.makeLiteral(false);
+                       }
+               }
+               return RexUtil.composeConjunction(rexBuilder, terms, false);
+       }
+
+       private <C extends Comparable<C>> RexNode 
simplifyUsingPredicates(RexNode e,
+                                                                         
Class<C> clazz) {
+               final Comparison comparison = Comparison.of(e);
+               // Check for comparison with null values
+               if (comparison == null
+                               || comparison.kind == SqlKind.NOT_EQUALS
+                               || comparison.literal.getValue() == null) {
+                       return e;
+               }
+               final C v0 = comparison.literal.getValueAs(clazz);
+               final Range<C> range = range(comparison.kind, v0);
+               final Range<C> range2 =
+                               residue(comparison.ref, range, 
predicates.pulledUpPredicates,
+                                               clazz);
+               if (range2 == null) {
+                       // Term is impossible to satisfy given these predicates
+                       return rexBuilder.makeLiteral(false);
+               } else if (range2.equals(range)) {
+                       // no change
+                       return e;
+               } else if (range2.equals(Range.all())) {
+                       // Term is always satisfied given these predicates
+                       return rexBuilder.makeLiteral(true);
+               } else if 
(range2.lowerEndpoint().equals(range2.upperEndpoint())) {
+                       if (range2.lowerBoundType() == BoundType.OPEN
+                                       || range2.upperBoundType() == 
BoundType.OPEN) {
+                               // range is a point, but does not include its 
endpoint, therefore is
+                               // effectively empty
+                               return rexBuilder.makeLiteral(false);
+                       }
+                       // range is now a point; it's worth simplifying
+                       return rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, 
comparison.ref,
+                                       
rexBuilder.makeLiteral(range2.lowerEndpoint(),
+                                                       
comparison.literal.getType(), comparison.literal.getTypeName()));
+               } else {
+                       // range has been reduced but it's not worth simplifying
+                       return e;
+               }
+       }
+
+       /** Weakens a term so that it checks only what is not implied by 
predicates.
+        *
+        * <p>The term is broken into "ref comparison constant",
+        * for example "$0 &lt; 5".
+        *
+        * <p>Examples:
+        * <ul>
+        *
+        * <li>{@code residue($0 < 10, [$0 < 5])} returns {@code true}
+        *
+        * <li>{@code residue($0 < 10, [$0 < 20, $0 > 0])} returns {@code $0 < 
10}
+        * </ul>
+        */
+       private <C extends Comparable<C>> Range<C> residue(RexNode ref, 
Range<C> r0,
+                                                          List<RexNode> 
predicates, Class<C> clazz) {
+               for (RexNode predicate : predicates) {
+                       switch (predicate.getKind()) {
+                               case EQUALS:
+                               case LESS_THAN:
+                               case LESS_THAN_OR_EQUAL:
+                               case GREATER_THAN:
+                               case GREATER_THAN_OR_EQUAL:
+                                       final RexCall call = (RexCall) 
predicate;
+                                       if (call.operands.get(0).equals(ref)
+                                                       && call.operands.get(1) 
instanceof RexLiteral) {
+                                               final RexLiteral literal = 
(RexLiteral) call.operands.get(1);
+                                               final C c1 = 
literal.getValueAs(clazz);
+                                               final Range<C> r1 = 
range(predicate.getKind(), c1);
+                                               if (r0.encloses(r1)) {
+                                                       // Given these 
predicates, term is always satisfied.
+                                                       // e.g. r0 is "$0 < 
10", r1 is "$0 < 5"
+                                                       return Range.all();
+                                               }
+                                               if (r0.isConnected(r1)) {
+                                                       return 
r0.intersection(r1);
+                                               }
+                                               // Ranges do not intersect. 
Return null meaning the empty range.
+                                               return null;
+                                       }
+                       }
+               }
+               return r0;
+       }
+
+       /** Simplifies OR(x, x) into x, and similar. */
+       public RexNode simplifyOr(RexCall call) {
+               assert call.getKind() == SqlKind.OR;
+               final List<RexNode> terms = RelOptUtil.disjunctions(call);
+               return simplifyOrs(terms);
+       }
+
+       /** Simplifies a list of terms and combines them into an OR.
+        * Modifies the list in place. */
+       public RexNode simplifyOrs(List<RexNode> terms) {
+               for (int i = 0; i < terms.size(); i++) {
+                       final RexNode term = simplify(terms.get(i));
+                       switch (term.getKind()) {
+                               case LITERAL:
+                                       if (!RexLiteral.isNullLiteral(term)) {
+                                               if 
(RexLiteral.booleanValue(term)) {
+                                                       return term; // true
+                                               } else {
+                                                       terms.remove(i);
+                                                       --i;
+                                                       continue;
+                                               }
+                                       }
+                       }
+                       terms.set(i, term);
+               }
+               return RexUtil.composeDisjunction(rexBuilder, terms);
+       }
+
+       private RexNode simplifyCast(RexCall e) {
+               final RexNode operand = e.getOperands().get(0);
+               switch (operand.getKind()) {
+                       case LITERAL:
+                               final RexLiteral literal = (RexLiteral) operand;
+                               final Comparable value = 
literal.getValueAs(Comparable.class);
+                               final SqlTypeName typeName = 
literal.getTypeName();
+
+                               // First, try to remove the cast without 
changing the value.
+                               // makeCast and canRemoveCastFromLiteral have 
the same logic, so we are
+                               // sure to be able to remove the cast.
+                               if 
(rexBuilder.canRemoveCastFromLiteral(e.getType(), value, typeName)) {
+                                       return rexBuilder.makeCast(e.getType(), 
operand);
+                               }
+
+                               // Next, try to convert the value to a 
different type,
+                               // e.g. CAST('123' as integer)
+                               switch (literal.getTypeName()) {
+                                       case TIME:
+                                               switch 
(e.getType().getSqlTypeName()) {
+                                                       case TIMESTAMP:
+                                                               return e;
+                                               }
+                                               break;
+                               }
+                               final List<RexNode> reducedValues = new 
ArrayList<>();
+                               executor.reduce(rexBuilder, 
ImmutableList.<RexNode>of(e), reducedValues);
+                               return Preconditions.checkNotNull(
+                                               
Iterables.getOnlyElement(reducedValues));
+                       default:
+                               return e;
+               }
+       }
+
+       /** Removes any casts that change nullability but not type.
+        *
+        * <p>For example, {@code CAST(1 = 0 AS BOOLEAN)} becomes {@code 1 = 
0}. */
+       public RexNode removeNullabilityCast(RexNode e) {
+               while (RexUtil.isNullabilityCast(rexBuilder.getTypeFactory(), 
e)) {
+                       e = ((RexCall) e).operands.get(0);
+               }
+               return e;
+       }
+
+       private static <C extends Comparable<C>> RexNode processRange(
+                       RexBuilder rexBuilder, List<RexNode> terms,
+                       Map<String, Pair<Range<C>, List<RexNode>>> rangeTerms, 
RexNode term,
+                       RexNode ref, C v0, SqlKind comparison) {
+               Pair<Range<C>, List<RexNode>> p = 
rangeTerms.get(ref.toString());
+               if (p == null) {
+                       rangeTerms.put(ref.toString(),
+                                       Pair.of(range(comparison, v0),
+                                                       (List<RexNode>) 
ImmutableList.of(term)));
+               } else {
+                       // Exists
+                       boolean removeUpperBound = false;
+                       boolean removeLowerBound = false;
+                       Range<C> r = p.left;
+                       switch (comparison) {
+                               case EQUALS:
+                                       if (!r.contains(v0)) {
+                                               // Range is empty, not 
satisfiable
+                                               return 
rexBuilder.makeLiteral(false);
+                                       }
+                                       rangeTerms.put(ref.toString(),
+                                                       
Pair.of(Range.singleton(v0),
+                                                                       
(List<RexNode>) ImmutableList.of(term)));
+                                       // remove
+                                       for (RexNode e : p.right) {
+                                               Collections.replaceAll(terms, 
e, rexBuilder.makeLiteral(true));
+                                       }
+                                       break;
+                               case LESS_THAN: {
+                                       int comparisonResult = 0;
+                                       if (r.hasUpperBound()) {
+                                               comparisonResult = 
v0.compareTo(r.upperEndpoint());
+                                       }
+                                       if (comparisonResult <= 0) {
+                                               // 1) No upper bound, or
+                                               // 2) We need to open the upper 
bound, or
+                                               // 3) New upper bound is lower 
than old upper bound
+                                               if (r.hasLowerBound()) {
+                                                       if 
(v0.compareTo(r.lowerEndpoint()) <= 0) {
+                                                               // Range is 
empty, not satisfiable
+                                                               return 
rexBuilder.makeLiteral(false);
+                                                       }
+                                                       // a <= x < b OR a < x 
< b
+                                                       r = 
Range.range(r.lowerEndpoint(), r.lowerBoundType(),
+                                                                       v0, 
BoundType.OPEN);
+                                               } else {
+                                                       // x < b
+                                                       r = Range.lessThan(v0);
+                                               }
+
+                                               if (r.isEmpty()) {
+                                                       // Range is empty, not 
satisfiable
+                                                       return 
rexBuilder.makeLiteral(false);
+                                               }
+
+                                               // remove prev upper bound
+                                               removeUpperBound = true;
+                                       } else {
+                                               int termIndex = 
terms.indexOf(term);
+                                               if (termIndex >= 0) {
+                                                       // Remove this term as 
it is contained in current upper bound
+                                                       terms.set(termIndex, 
rexBuilder.makeLiteral(true));
+                                               }
+                                       }
+                                       break;
+                               }
+                               case LESS_THAN_OR_EQUAL: {
+                                       int comparisonResult = -1;
+                                       if (r.hasUpperBound()) {
+                                               comparisonResult = 
v0.compareTo(r.upperEndpoint());
+                                       }
+                                       if (comparisonResult < 0) {
+                                               // 1) No upper bound, or
+                                               // 2) New upper bound is lower 
than old upper bound
+                                               if (r.hasLowerBound()) {
+                                                       if 
(v0.compareTo(r.lowerEndpoint()) < 0) {
+                                                               // Range is 
empty, not satisfiable
+                                                               return 
rexBuilder.makeLiteral(false);
+                                                       }
+                                                       // a <= x <= b OR a < x 
<= b
+                                                       r = 
Range.range(r.lowerEndpoint(), r.lowerBoundType(),
+                                                                       v0, 
BoundType.CLOSED);
+                                               } else {
+                                                       // x <= b
+                                                       r = Range.atMost(v0);
+                                               }
+
+                                               if (r.isEmpty()) {
+                                                       // Range is empty, not 
satisfiable
+                                                       return 
rexBuilder.makeLiteral(false);
+                                               }
+
+                                               // remove prev upper bound
+                                               removeUpperBound = true;
+                                       } else {
+                                               int termIndex = 
terms.indexOf(term);
+                                               if (termIndex >= 0) {
+                                                       // Remove this term as 
it is contained in current upper bound
+                                                       terms.set(termIndex, 
rexBuilder.makeLiteral(true));
+                                               }
+                                       }
+                                       break;
+                               }
+                               case GREATER_THAN: {
+                                       int comparisonResult = 0;
+                                       if (r.hasLowerBound()) {
+                                               comparisonResult = 
v0.compareTo(r.lowerEndpoint());
+                                       }
+                                       if (comparisonResult >= 0) {
+                                               // 1) No lower bound, or
+                                               // 2) We need to open the lower 
bound, or
+                                               // 3) New lower bound is 
greater than old lower bound
+                                               if (r.hasUpperBound()) {
+                                                       if 
(v0.compareTo(r.upperEndpoint()) >= 0) {
+                                                               // Range is 
empty, not satisfiable
+                                                               return 
rexBuilder.makeLiteral(false);
+                                                       }
+                                                       // a < x <= b OR a < x 
< b
+                                                       r = Range.range(v0, 
BoundType.OPEN,
+                                                                       
r.upperEndpoint(), r.upperBoundType());
+                                               } else {
+                                                       // x > a
+                                                       r = 
Range.greaterThan(v0);
+                                               }
+
+                                               if (r.isEmpty()) {
+                                                       // Range is empty, not 
satisfiable
+                                                       return 
rexBuilder.makeLiteral(false);
+                                               }
+
+                                               // remove prev lower bound
+                                               removeLowerBound = true;
+                                       } else {
+                                               int termIndex = 
terms.indexOf(term);
+                                               if (termIndex >= 0) {
+                                                       // Remove this term as 
it is contained in current lower bound
+                                                       terms.set(termIndex, 
rexBuilder.makeLiteral(true));
+                                               }
+                                       }
+                                       break;
+                               }
+                               case GREATER_THAN_OR_EQUAL: {
+                                       int comparisonResult = 1;
+                                       if (r.hasLowerBound()) {
+                                               comparisonResult = 
v0.compareTo(r.lowerEndpoint());
+                                       }
+                                       if (comparisonResult > 0) {
+                                               // 1) No lower bound, or
+                                               // 2) New lower bound is 
greater than old lower bound
+                                               if (r.hasUpperBound()) {
+                                                       if 
(v0.compareTo(r.upperEndpoint()) > 0) {
+                                                               // Range is 
empty, not satisfiable
+                                                               return 
rexBuilder.makeLiteral(false);
+                                                       }
+                                                       // a <= x <= b OR a <= 
x < b
+                                                       r = Range.range(v0, 
BoundType.CLOSED,
+                                                                       
r.upperEndpoint(), r.upperBoundType());
+                                               } else {
+                                                       // x >= a
+                                                       r = Range.atLeast(v0);
+                                               }
+
+                                               if (r.isEmpty()) {
+                                                       // Range is empty, not 
satisfiable
+                                                       return 
rexBuilder.makeLiteral(false);
+                                               }
+
+                                               // remove prev lower bound
+                                               removeLowerBound = true;
+                                       } else {
+                                               int termIndex = 
terms.indexOf(term);
+                                               if (termIndex >= 0) {
+                                                       // Remove this term as 
it is contained in current lower bound
+                                                       terms.set(termIndex, 
rexBuilder.makeLiteral(true));
+                                               }
+                                       }
+                                       break;
+                               }
+                               default:
+                                       throw new AssertionError();
+                       }
+                       if (removeUpperBound) {
+                               ImmutableList.Builder<RexNode> newBounds = 
ImmutableList.builder();
+                               for (RexNode e : p.right) {
+                                       if (isUpperBound(e)) {
+                                               Collections.replaceAll(terms, 
e, rexBuilder.makeLiteral(true));
+                                       } else {
+                                               newBounds.add(e);
+                                       }
+                               }
+                               newBounds.add(term);
+                               rangeTerms.put(ref.toString(),
+                                               Pair.of(r, (List<RexNode>) 
newBounds.build()));
+                       } else if (removeLowerBound) {
+                               ImmutableList.Builder<RexNode> newBounds = 
ImmutableList.builder();
+                               for (RexNode e : p.right) {
+                                       if (isLowerBound(e)) {
+                                               Collections.replaceAll(terms, 
e, rexBuilder.makeLiteral(true));
+                                       } else {
+                                               newBounds.add(e);
+                                       }
+                               }
+                               newBounds.add(term);
+                               rangeTerms.put(ref.toString(),
+                                               Pair.of(r, (List<RexNode>) 
newBounds.build()));
+                       }
+               }
+               // Default
+               return null;
+       }
+
+       private static <C extends Comparable<C>> Range<C> range(SqlKind 
comparison,
+                                                               C c) {
+               switch (comparison) {
+                       case EQUALS:
+                               return Range.singleton(c);
+                       case LESS_THAN:
+                               return Range.lessThan(c);
+                       case LESS_THAN_OR_EQUAL:
+                               return Range.atMost(c);
+                       case GREATER_THAN:
+                               return Range.greaterThan(c);
+                       case GREATER_THAN_OR_EQUAL:
+                               return Range.atLeast(c);
+                       default:
+                               throw new AssertionError();
+               }
+       }
+
+       /** Comparison between a {@link RexInputRef} or {@link RexFieldAccess} 
and a
+        * literal. Literal may be on left or right side, and may be null. */
+       private static class Comparison {
+               final RexNode ref;
+               final SqlKind kind;
+               final RexLiteral literal;
+
+               private Comparison(RexNode ref, SqlKind kind, RexLiteral 
literal) {
+                       this.ref = Preconditions.checkNotNull(ref);
+                       this.kind = Preconditions.checkNotNull(kind);
+                       this.literal = Preconditions.checkNotNull(literal);
+               }
+
+               /** Creates a comparison, or returns null. */
+               static Comparison of(RexNode e) {
+                       switch (e.getKind()) {
+                               case EQUALS:
+                               case NOT_EQUALS:
+                               case LESS_THAN:
+                               case GREATER_THAN:
+                               case LESS_THAN_OR_EQUAL:
+                               case GREATER_THAN_OR_EQUAL:
+                                       final RexCall call = (RexCall) e;
+                                       final RexNode left = 
call.getOperands().get(0);
+                                       final RexNode right = 
call.getOperands().get(1);
+                                       switch (right.getKind()) {
+                                               case LITERAL:
+                                                       if 
(RexUtil.isReferenceOrAccess(left, true)) {
+                                                               return new 
Comparison(left, e.getKind(), (RexLiteral) right);
+                                                       }
+                                       }
+                                       switch (left.getKind()) {
+                                               case LITERAL:
+                                                       if 
(RexUtil.isReferenceOrAccess(right, true)) {
+                                                               return new 
Comparison(right, e.getKind().reverse(),
+                                                                               
(RexLiteral) left);
+                                                       }
+                                       }
+                       }
+                       return null;
+               }
+       }
+
+       private static boolean isUpperBound(final RexNode e) {
+               final List<RexNode> operands;
+               switch (e.getKind()) {
+                       case LESS_THAN:
+                       case LESS_THAN_OR_EQUAL:
+                               operands = ((RexCall) e).getOperands();
+                               return 
RexUtil.isReferenceOrAccess(operands.get(0), true)
+                                               && 
operands.get(1).isA(SqlKind.LITERAL);
+                       case GREATER_THAN:
+                       case GREATER_THAN_OR_EQUAL:
+                               operands = ((RexCall) e).getOperands();
+                               return 
RexUtil.isReferenceOrAccess(operands.get(1), true)
+                                               && 
operands.get(0).isA(SqlKind.LITERAL);
+                       default:
+                               return false;
+               }
+       }
+
+       private static boolean isLowerBound(final RexNode e) {
+               final List<RexNode> operands;
+               switch (e.getKind()) {
+                       case LESS_THAN:
+                       case LESS_THAN_OR_EQUAL:
+                               operands = ((RexCall) e).getOperands();
+                               return 
RexUtil.isReferenceOrAccess(operands.get(1), true)
+                                               && 
operands.get(0).isA(SqlKind.LITERAL);
+                       case GREATER_THAN:
+                       case GREATER_THAN_OR_EQUAL:
+                               operands = ((RexCall) e).getOperands();
+                               return 
RexUtil.isReferenceOrAccess(operands.get(0), true)
+                                               && 
operands.get(1).isA(SqlKind.LITERAL);
+                       default:
+                               return false;
+               }
+       }
+}
+
+// End RexSimplify.java

http://git-wip-us.apache.org/repos/asf/flink/blob/4816a6e7/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/SqlGroupedWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/SqlGroupedWindowFunction.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/SqlGroupedWindowFunction.java
new file mode 100644
index 0000000..0f661e1
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/SqlGroupedWindowFunction.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql;
+
+import com.google.common.base.Preconditions;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+import org.apache.calcite.sql.type.SqlOperandTypeInference;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/*
+ * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT UNTIL 
CALCITE-2133 IS FIXED.
+ */
+
+/**
+ * SQL function that computes keys by which rows can be partitioned and
+ * aggregated.
+ *
+ * <p>Grouped window functions always occur in the GROUP BY clause. They often
+ * have auxiliary functions that access information about the group. For
+ * example, {@code HOP} is a group function, and its auxiliary functions are
+ * {@code HOP_START} and {@code HOP_END}. Here they are used in a streaming
+ * query:
+ *
+ * <blockquote><pre>
+ * SELECT STREAM HOP_START(rowtime, INTERVAL '1' HOUR),
+ *   HOP_END(rowtime, INTERVAL '1' HOUR),
+ *   MIN(unitPrice)
+ * FROM Orders
+ * GROUP BY HOP(rowtime, INTERVAL '1' HOUR), productId
+ * </pre></blockquote>
+ */
+public class SqlGroupedWindowFunction extends SqlFunction {
+       /** The grouped function, if this an auxiliary function; null 
otherwise. */
+       public final SqlGroupedWindowFunction groupFunction;
+
+       /** Creates a SqlGroupedWindowFunction.
+        *
+        * @param name Function name
+        * @param kind Kind
+        * @param groupFunction Group function, if this is an auxiliary;
+        *                      null, if this is a group function
+        * @param returnTypeInference  Strategy to use for return type inference
+        * @param operandTypeInference Strategy to use for parameter type 
inference
+        * @param operandTypeChecker   Strategy to use for parameter type 
checking
+        * @param category             Categorization for function
+        */
+       public SqlGroupedWindowFunction(String name, SqlKind kind,
+                                       SqlGroupedWindowFunction groupFunction,
+                                       SqlReturnTypeInference 
returnTypeInference,
+                                       SqlOperandTypeInference 
operandTypeInference,
+                                       SqlOperandTypeChecker 
operandTypeChecker, SqlFunctionCategory category) {
+               super(name, kind, returnTypeInference, operandTypeInference,
+                               operandTypeChecker, category);
+               this.groupFunction = groupFunction;
+               Preconditions.checkArgument(groupFunction == null
+                               || groupFunction.groupFunction == null);
+       }
+
+       @Deprecated // to be removed before 2.0
+       public SqlGroupedWindowFunction(String name, SqlKind kind,
+                                       SqlGroupedWindowFunction groupFunction,
+                                       SqlOperandTypeChecker 
operandTypeChecker) {
+               this(name, kind, groupFunction, ReturnTypes.ARG0, null, 
operandTypeChecker,
+                               SqlFunctionCategory.SYSTEM);
+       }
+
+       @Deprecated // to be removed before 2.0
+       public SqlGroupedWindowFunction(SqlKind kind,
+                                       SqlGroupedWindowFunction groupFunction,
+                                       SqlOperandTypeChecker 
operandTypeChecker) {
+               this(kind.name(), kind, groupFunction, ReturnTypes.ARG0, null,
+                               operandTypeChecker, SqlFunctionCategory.SYSTEM);
+       }
+
+       /** Creates an auxiliary function from this grouped window function.
+        *
+        * @param kind Kind; also determines function name
+        */
+       public SqlGroupedWindowFunction auxiliary(SqlKind kind) {
+               return auxiliary(kind.name(), kind);
+       }
+
+       /** Creates an auxiliary function from this grouped window function.
+        *
+        * @param name Function name
+        * @param kind Kind
+        */
+       public SqlGroupedWindowFunction auxiliary(String name, SqlKind kind) {
+               return new SqlGroupedWindowFunction(name, kind, this, 
getOperandTypeChecker());
+       }
+
+       /** Returns a list of this grouped window function's auxiliary 
functions. */
+       public List<SqlGroupedWindowFunction> getAuxiliaryFunctions() {
+               return ImmutableList.of();
+       }
+
+       @Override public boolean isGroup() {
+               // Auxiliary functions are not group functions
+               return groupFunction == null;
+       }
+
+       @Override public boolean isGroupAuxiliary() {
+               return groupFunction != null;
+       }
+
+       @Override public SqlMonotonicity getMonotonicity(SqlOperatorBinding 
call) {
+               // Monotonic iff its first argument is, but not strict.
+               //
+               // Note: This strategy happens to works for all current group 
functions
+               // (HOP, TUMBLE, SESSION). When there are exceptions to this 
rule, we'll
+               // make the method abstract.
+               return call.getOperandMonotonicity(0).unstrict();
+       }
+}
+
+// End SqlGroupedWindowFunction.java

http://git-wip-us.apache.org/repos/asf/flink/blob/4816a6e7/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
deleted file mode 100644
index 0bb26da..0000000
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.sql.fun;
-
-import org.apache.calcite.sql.SqlFunction;
-import org.apache.calcite.sql.SqlFunctionCategory;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlOperatorBinding;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlOperandTypeChecker;
-import org.apache.calcite.sql.type.SqlReturnTypeInference;
-import org.apache.calcite.sql.validate.SqlMonotonicity;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.List;
-
-/*
- * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT UNTIL 
CALCITE-1867 IS FIXED.
- */
-
-/**
- * SQL function that computes keys by which rows can be partitioned and
- * aggregated.
- *
- * <p>Grouped window functions always occur in the GROUP BY clause. They often
- * have auxiliary functions that access information about the group. For
- * example, {@code HOP} is a group function, and its auxiliary functions are
- * {@code HOP_START} and {@code HOP_END}. Here they are used in a streaming
- * query:
- *
- * <blockquote><pre>
- * SELECT STREAM HOP_START(rowtime, INTERVAL '1' HOUR),
- *   HOP_END(rowtime, INTERVAL '1' HOUR),
- *   MIN(unitPrice)
- * FROM Orders
- * GROUP BY HOP(rowtime, INTERVAL '1' HOUR), productId
- * </pre></blockquote>
- */
-public class SqlGroupFunction extends SqlFunction {
-  /** The grouped function, if this an auxiliary function; null otherwise. */
-  final SqlGroupFunction groupFunction;
-
-  /** Creates a SqlGroupFunction.
-   *
-   * @param name Function name
-   * @param kind Kind
-   * @param groupFunction Group function, if this is an auxiliary;
-   *                      null, if this is a group function
-   * @param operandTypeChecker Operand type checker
-   */
-  public SqlGroupFunction(String name, SqlKind kind, SqlGroupFunction 
groupFunction,
-      SqlOperandTypeChecker operandTypeChecker) {
-    super(name, kind, ReturnTypes.ARG0, null,
-        operandTypeChecker, SqlFunctionCategory.SYSTEM);
-    this.groupFunction = groupFunction;
-    if (groupFunction != null) {
-      assert groupFunction.groupFunction == null;
-    }
-  }
-
-  /** Creates a SqlGroupFunction.
-   *
-   * @param kind Kind; also determines function name
-   * @param groupFunction Group function, if this is an auxiliary;
-   *                      null, if this is a group function
-   * @param operandTypeChecker Operand type checker
-   */
-  public SqlGroupFunction(SqlKind kind, SqlGroupFunction groupFunction,
-      SqlOperandTypeChecker operandTypeChecker) {
-    this(kind.name(), kind, groupFunction, operandTypeChecker);
-  }
-
-       /** Creates a SqlGroupFunction.
-        *
-        * @param name Function name
-        * @param kind Kind
-        * @param groupFunction Group function, if this is an auxiliary;
-        *                      null, if this is a group function
-        * @param returnTypeInference Inference of the functions return type
-        * @param operandTypeChecker Operand type checker
-        */
-  public SqlGroupFunction(String name, SqlKind kind, SqlGroupFunction 
groupFunction,
-      SqlReturnTypeInference returnTypeInference, SqlOperandTypeChecker 
operandTypeChecker) {
-    super(name, kind, returnTypeInference, null, operandTypeChecker,
-      SqlFunctionCategory.SYSTEM);
-    this.groupFunction = groupFunction;
-    if (groupFunction != null) {
-      assert groupFunction.groupFunction == null;
-    }
-  }
-
-  /** Creates an auxiliary function from this grouped window function.
-   *
-   * @param kind Kind; also determines function name
-   */
-  public SqlGroupFunction auxiliary(SqlKind kind) {
-    return auxiliary(kind.name(), kind);
-  }
-
-  /** Creates an auxiliary function from this grouped window function.
-   *
-   * @param name Function name
-   * @param kind Kind
-   */
-  public SqlGroupFunction auxiliary(String name, SqlKind kind) {
-    return new SqlGroupFunction(name, kind, this, getOperandTypeChecker());
-  }
-
-  /** Returns a list of this grouped window function's auxiliary functions. */
-  public List<SqlGroupFunction> getAuxiliaryFunctions() {
-    return ImmutableList.of();
-  }
-
-  @Override public boolean isGroup() {
-    // Auxiliary functions are not group functions
-    return groupFunction == null;
-  }
-
-  @Override public boolean isGroupAuxiliary() {
-    return groupFunction != null;
-  }
-
-  @Override public SqlMonotonicity getMonotonicity(SqlOperatorBinding call) {
-    // Monotonic iff its first argument is, but not strict.
-    //
-    // Note: This strategy happens to works for all current group functions
-    // (HOP, TUMBLE, SESSION). When there are exceptions to this rule, we'll
-    // make the method abstract.
-    return call.getOperandMonotonicity(0).unstrict();
-  }
-}
-
-// End SqlGroupFunction.java

http://git-wip-us.apache.org/repos/asf/flink/blob/4816a6e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index db7ffdb..515a36d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -322,7 +322,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
 
 object FlinkTypeFactory {
 
-  private def typeInfoToSqlTypeName(typeInfo: TypeInformation[_]): SqlTypeName 
= typeInfo match {
+  def typeInfoToSqlTypeName(typeInfo: TypeInformation[_]): SqlTypeName = 
typeInfo match {
       case BOOLEAN_TYPE_INFO => BOOLEAN
       case BYTE_TYPE_INFO => TINYINT
       case SHORT_TYPE_INFO => SMALLINT

http://git-wip-us.apache.org/repos/asf/flink/blob/4816a6e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ExtractCallGen.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ExtractCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ExtractCallGen.scala
new file mode 100644
index 0000000..fe72733
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ExtractCallGen.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import java.lang.reflect.Method
+
+import org.apache.calcite.avatica.util.{TimeUnit, TimeUnitRange}
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
GeneratedExpression}
+import 
org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
+
+class ExtractCallGen(returnType: TypeInformation[_], method: Method)
+  extends MethodCallGen(returnType, method) {
+
+  override def generate(codeGenerator: CodeGenerator, operands: 
Seq[GeneratedExpression])
+  : GeneratedExpression = {
+    val unit = getEnum(operands(0)).asInstanceOf[TimeUnitRange].startUnit
+    val sqlTypeName = 
FlinkTypeFactory.typeInfoToSqlTypeName(operands(1).resultType)
+    unit match {
+      case TimeUnit.YEAR |
+           TimeUnit.MONTH |
+           TimeUnit.DAY |
+           TimeUnit.QUARTER |
+           TimeUnit.DOW |
+           TimeUnit.DOY |
+           TimeUnit.WEEK |
+           TimeUnit.CENTURY |
+           TimeUnit.MILLENNIUM=>
+        sqlTypeName match {
+          case SqlTypeName.TIMESTAMP =>
+            return generateCallIfArgsNotNull(codeGenerator.nullCheck, 
returnType, operands) {
+              (terms) =>
+                s"""
+                   |${qualifyMethod(method)}(${terms.head},
+                   |    ${terms(1)} / ${TimeUnit.DAY.multiplier.intValue()})
+                   |""".stripMargin
+            }
+
+          case SqlTypeName.DATE =>
+            return super.generate(codeGenerator, operands)
+
+          case _ => // do nothing
+        }
+
+      case _ => // do nothing
+    }
+    generateCallIfArgsNotNull(codeGenerator.nullCheck, returnType, operands) {
+      (terms) => {
+        val factor = getFactor(unit)
+        unit match {
+          case TimeUnit.QUARTER =>
+            s"""
+               |((${terms(1)} % ${factor}) - 1) / 
${unit.multiplier.intValue()} + 1
+               |""".stripMargin
+          case _ =>
+            if (factor == 1) {
+              s"""
+                 |${terms(1)} / ${unit.multiplier.intValue()}
+                 |""".stripMargin
+            } else {
+              s"""
+                 |(${terms(1)} % ${factor}) / ${unit.multiplier.intValue()}
+                 |""".stripMargin
+            }
+        }
+      }
+    }
+  }
+
+  private def getFactor(unit: TimeUnit): Long = {
+    unit match {
+      case TimeUnit.DAY =>
+        1L
+      case TimeUnit.HOUR =>
+        TimeUnit.DAY.multiplier.longValue()
+      case TimeUnit.MINUTE =>
+        TimeUnit.HOUR.multiplier.longValue()
+      case TimeUnit.SECOND =>
+        TimeUnit.MINUTE.multiplier.longValue()
+      case TimeUnit.MONTH =>
+        TimeUnit.YEAR.multiplier.longValue()
+      case TimeUnit.QUARTER =>
+        TimeUnit.YEAR.multiplier.longValue()
+      case TimeUnit.YEAR |
+           TimeUnit.DECADE |
+           TimeUnit.CENTURY |
+           TimeUnit.MILLENNIUM => 1L
+      case _ => throw new CodeGenException("unit %s is NOT 
supported.".format(unit))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4816a6e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 412cdfc..9cd67c8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.codegen.calls
 
 import java.lang.reflect.Method
 
+import org.apache.calcite.avatica.SqlType
 import org.apache.calcite.avatica.util.TimeUnitRange
 import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
@@ -31,6 +32,7 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.table.functions.sql.ScalarSqlFunctions
 import org.apache.flink.table.functions.sql.ScalarSqlFunctions._
 import org.apache.flink.table.functions.utils.{ScalarSqlFunction, 
TableSqlFunction}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 
 import scala.collection.mutable
 
@@ -428,17 +430,36 @@ object FunctionGenerator {
   // Temporal functions
   // 
----------------------------------------------------------------------------------------------
 
-  addSqlFunctionMethod(
-    EXTRACT_DATE,
+  addSqlFunction(
+    EXTRACT,
     Seq(new GenericTypeInfo(classOf[TimeUnitRange]), LONG_TYPE_INFO),
-    LONG_TYPE_INFO,
-    BuiltInMethod.UNIX_DATE_EXTRACT.method)
+    new ExtractCallGen(LONG_TYPE_INFO, BuiltInMethod.UNIX_DATE_EXTRACT.method))
 
-  addSqlFunctionMethod(
-    EXTRACT_DATE,
+  addSqlFunction(
+    EXTRACT,
+    Seq(new GenericTypeInfo(classOf[TimeUnitRange]), 
TimeIntervalTypeInfo.INTERVAL_MILLIS),
+    new ExtractCallGen(LONG_TYPE_INFO, BuiltInMethod.UNIX_DATE_EXTRACT.method))
+
+  addSqlFunction(
+    EXTRACT,
+    Seq(new GenericTypeInfo(classOf[TimeUnitRange]), 
SqlTimeTypeInfo.TIMESTAMP),
+    new ExtractCallGen(LONG_TYPE_INFO, BuiltInMethod.UNIX_DATE_EXTRACT.method))
+
+
+  addSqlFunction(
+    EXTRACT,
+    Seq(new GenericTypeInfo(classOf[TimeUnitRange]), SqlTimeTypeInfo.TIME),
+    new ExtractCallGen(LONG_TYPE_INFO, BuiltInMethod.UNIX_DATE_EXTRACT.method))
+
+  addSqlFunction(
+    EXTRACT,
+    Seq(new GenericTypeInfo(classOf[TimeUnitRange]), 
TimeIntervalTypeInfo.INTERVAL_MONTHS),
+    new ExtractCallGen(LONG_TYPE_INFO, BuiltInMethod.UNIX_DATE_EXTRACT.method))
+
+  addSqlFunction(
+    EXTRACT,
     Seq(new GenericTypeInfo(classOf[TimeUnitRange]), SqlTimeTypeInfo.DATE),
-    LONG_TYPE_INFO,
-    BuiltInMethod.UNIX_DATE_EXTRACT.method)
+    new ExtractCallGen(LONG_TYPE_INFO, BuiltInMethod.UNIX_DATE_EXTRACT.method))
 
   addSqlFunction(
     FLOOR,

http://git-wip-us.apache.org/repos/asf/flink/blob/4816a6e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
index 3adaaa9..51526b2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
@@ -136,7 +136,7 @@ case class Count(child: Expression) extends Aggregation {
   override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO
 
   override private[flink] def getSqlAggFunction()(implicit relBuilder: 
RelBuilder) = {
-    new SqlCountAggFunction()
+    new SqlCountAggFunction("COUNT")
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4816a6e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
index a69c1d4..5d75cd4 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -20,7 +20,8 @@ package org.apache.flink.table.expressions
 
 import org.apache.calcite.avatica.util.{TimeUnit, TimeUnitRange}
 import org.apache.calcite.rex._
-import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.SqlFunctionCategory
+import org.apache.calcite.sql.`type`.{SqlOperandTypeChecker, SqlTypeName}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
@@ -81,62 +82,26 @@ case class Extract(timeIntervalUnit: Expression, temporal: 
Expression) extends E
       .enum
       .asInstanceOf[TimeUnitRange]
 
+    relBuilder.getRexBuilder
     // convert RexNodes
-    convertExtract(
+    convertFunction(
       timeIntervalUnit.toRexNode,
-      timeUnitRange,
       temporal.toRexNode,
       relBuilder.asInstanceOf[FlinkRelBuilder])
   }
 
-  /**
-    * Standard conversion of the EXTRACT operator.
-    * Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#convertExtract()]]
-    */
-  private def convertExtract(
-      timeUnitRangeRexNode: RexNode,
-      timeUnitRange: TimeUnitRange,
-      temporal: RexNode,
-      relBuilder: FlinkRelBuilder)
-    : RexNode = {
-
-    // TODO convert this into Table API expressions to make the code more 
readable
+  // Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#convertFunction()]]
+  private def convertFunction(timeUnitRangeRexNode: RexNode,
+                              temporal: RexNode,
+                              relBuilder: FlinkRelBuilder): RexNode = {
     val rexBuilder = relBuilder.getRexBuilder
     val resultType = relBuilder
       .getTypeFactory()
       .createTypeFromTypeInfo(LONG_TYPE_INFO, isNullable = true)
-    var result = rexBuilder.makeReinterpretCast(
+    rexBuilder.makeCall(
       resultType,
-      temporal,
-      rexBuilder.makeLiteral(false))
-
-    val unit = timeUnitRange.startUnit
-    val sqlTypeName = temporal.getType.getSqlTypeName
-    unit match {
-      case TimeUnit.YEAR | TimeUnit.MONTH | TimeUnit.DAY =>
-        sqlTypeName match {
-          case SqlTypeName.TIMESTAMP =>
-            result = divide(rexBuilder, result, TimeUnit.DAY.multiplier)
-            return rexBuilder.makeCall(
-              resultType,
-              SqlStdOperatorTable.EXTRACT_DATE,
-              Seq(timeUnitRangeRexNode, result))
-
-          case SqlTypeName.DATE =>
-            return rexBuilder.makeCall(
-              resultType,
-              SqlStdOperatorTable.EXTRACT_DATE,
-              Seq(timeUnitRangeRexNode, result))
-
-          case _ => // do nothing
-        }
-
-      case _ => // do nothing
-    }
-
-    result = mod(rexBuilder, resultType, result, getFactor(unit))
-    result = divide(rexBuilder, result, unit.multiplier)
-    result
+      SqlStdOperatorTable.EXTRACT,
+      Seq(timeUnitRangeRexNode, temporal))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4816a6e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index add393d..773b2f0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.table.validate
 
 import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, 
SqlTypeTransforms}
-import org.apache.calcite.sql.fun.{SqlGroupFunction, SqlStdOperatorTable}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, 
ListSqlOperatorTable, ReflectiveSqlOperatorTable}
-import org.apache.calcite.sql.{SqlFunction, SqlKind, SqlOperator, 
SqlOperatorTable}
+import org.apache.calcite.sql._
 import org.apache.flink.table.api._
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.sql.ScalarSqlFunctions
@@ -368,7 +368,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable 
{
     SqlStdOperatorTable.SIMILAR_TO,
     SqlStdOperatorTable.CASE,
     SqlStdOperatorTable.REINTERPRET,
-    SqlStdOperatorTable.EXTRACT_DATE,
+    SqlStdOperatorTable.EXTRACT,
     SqlStdOperatorTable.IN,
     // FUNCTIONS
     SqlStdOperatorTable.SUBSTRING,
@@ -455,77 +455,83 @@ object BasicOperatorTable {
     * We need custom group auxiliary functions in order to support nested 
windows.
     */
 
-  val TUMBLE: SqlGroupFunction = new SqlGroupFunction(
+  val TUMBLE: SqlGroupedWindowFunction = new SqlGroupedWindowFunction(
     SqlKind.TUMBLE,
     null,
     OperandTypes.or(OperandTypes.DATETIME_INTERVAL, 
OperandTypes.DATETIME_INTERVAL_TIME)) {
-    override def getAuxiliaryFunctions: 
_root_.java.util.List[SqlGroupFunction] =
+    override def getAuxiliaryFunctions: 
_root_.java.util.List[SqlGroupedWindowFunction] =
       Seq(
         TUMBLE_START,
         TUMBLE_END,
         TUMBLE_ROWTIME,
         TUMBLE_PROCTIME)
   }
-  val TUMBLE_START: SqlGroupFunction = TUMBLE.auxiliary(SqlKind.TUMBLE_START)
-  val TUMBLE_END: SqlGroupFunction = TUMBLE.auxiliary(SqlKind.TUMBLE_END)
-  val TUMBLE_ROWTIME: SqlGroupFunction =
-    new SqlGroupFunction(
+  val TUMBLE_START: SqlGroupedWindowFunction = 
TUMBLE.auxiliary(SqlKind.TUMBLE_START)
+  val TUMBLE_END: SqlGroupedWindowFunction = 
TUMBLE.auxiliary(SqlKind.TUMBLE_END)
+  val TUMBLE_ROWTIME: SqlGroupedWindowFunction =
+    new SqlGroupedWindowFunction(
       "TUMBLE_ROWTIME",
       SqlKind.OTHER_FUNCTION,
       TUMBLE,
       // ensure that returned rowtime is always NOT_NULLABLE
       ReturnTypes.cascade(ReturnTypes.ARG0, SqlTypeTransforms.TO_NOT_NULLABLE),
-      TUMBLE.getOperandTypeChecker)
-  val TUMBLE_PROCTIME: SqlGroupFunction =
+      null,
+      TUMBLE.getOperandTypeChecker,
+      SqlFunctionCategory.SYSTEM)
+  val TUMBLE_PROCTIME: SqlGroupedWindowFunction =
     TUMBLE.auxiliary("TUMBLE_PROCTIME", SqlKind.OTHER_FUNCTION)
 
-  val HOP: SqlGroupFunction = new SqlGroupFunction(
+  val HOP: SqlGroupedWindowFunction = new SqlGroupedWindowFunction(
     SqlKind.HOP,
     null,
     OperandTypes.or(
       OperandTypes.DATETIME_INTERVAL_INTERVAL,
       OperandTypes.DATETIME_INTERVAL_INTERVAL_TIME)) {
-    override def getAuxiliaryFunctions: 
_root_.java.util.List[SqlGroupFunction] =
+    override def getAuxiliaryFunctions: 
_root_.java.util.List[SqlGroupedWindowFunction] =
       Seq(
         HOP_START,
         HOP_END,
         HOP_ROWTIME,
         HOP_PROCTIME)
   }
-  val HOP_START: SqlGroupFunction = HOP.auxiliary(SqlKind.HOP_START)
-  val HOP_END: SqlGroupFunction = HOP.auxiliary(SqlKind.HOP_END)
-  val HOP_ROWTIME: SqlGroupFunction =
-    new SqlGroupFunction(
+  val HOP_START: SqlGroupedWindowFunction = HOP.auxiliary(SqlKind.HOP_START)
+  val HOP_END: SqlGroupedWindowFunction = HOP.auxiliary(SqlKind.HOP_END)
+  val HOP_ROWTIME: SqlGroupedWindowFunction =
+    new SqlGroupedWindowFunction(
       "HOP_ROWTIME",
       SqlKind.OTHER_FUNCTION,
       HOP,
       // ensure that returned rowtime is always NOT_NULLABLE
       ReturnTypes.cascade(ReturnTypes.ARG0, SqlTypeTransforms.TO_NOT_NULLABLE),
-      HOP.getOperandTypeChecker)
-  val HOP_PROCTIME: SqlGroupFunction = HOP.auxiliary("HOP_PROCTIME", 
SqlKind.OTHER_FUNCTION)
+      null,
+      HOP.getOperandTypeChecker,
+      SqlFunctionCategory.SYSTEM)
+  val HOP_PROCTIME: SqlGroupedWindowFunction = HOP.auxiliary("HOP_PROCTIME", 
SqlKind.OTHER_FUNCTION)
 
-  val SESSION: SqlGroupFunction = new SqlGroupFunction(
+  val SESSION: SqlGroupedWindowFunction = new SqlGroupedWindowFunction(
     SqlKind.SESSION,
     null,
     OperandTypes.or(OperandTypes.DATETIME_INTERVAL, 
OperandTypes.DATETIME_INTERVAL_TIME)) {
-    override def getAuxiliaryFunctions: 
_root_.java.util.List[SqlGroupFunction] =
+    override def getAuxiliaryFunctions: 
_root_.java.util.List[SqlGroupedWindowFunction] =
       Seq(
         SESSION_START,
         SESSION_END,
         SESSION_ROWTIME,
         SESSION_PROCTIME)
   }
-  val SESSION_START: SqlGroupFunction = 
SESSION.auxiliary(SqlKind.SESSION_START)
-  val SESSION_END: SqlGroupFunction = SESSION.auxiliary(SqlKind.SESSION_END)
-  val SESSION_ROWTIME: SqlGroupFunction =
-    new SqlGroupFunction(
+  val SESSION_START: SqlGroupedWindowFunction = 
SESSION.auxiliary(SqlKind.SESSION_START)
+  val SESSION_END: SqlGroupedWindowFunction = 
SESSION.auxiliary(SqlKind.SESSION_END)
+  val SESSION_ROWTIME: SqlGroupedWindowFunction =
+    new SqlGroupedWindowFunction(
       "SESSION_ROWTIME",
       SqlKind.OTHER_FUNCTION,
       SESSION,
       // ensure that returned rowtime is always NOT_NULLABLE
       ReturnTypes.cascade(ReturnTypes.ARG0, SqlTypeTransforms.TO_NOT_NULLABLE),
-      SESSION.getOperandTypeChecker)
-  val SESSION_PROCTIME: SqlGroupFunction =
+      null,
+      SESSION.getOperandTypeChecker,
+      SqlFunctionCategory.SYSTEM)
+  val SESSION_PROCTIME: SqlGroupedWindowFunction =
     SESSION.auxiliary("SESSION_PROCTIME", SqlKind.OTHER_FUNCTION)
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4816a6e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
index 59aee9f..8d06bcd 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
@@ -299,7 +299,7 @@ class GroupWindowTest extends TableTestBase {
         term("select", "EXPR$0", "CAST(w$start) AS EXPR$1"),
         term("where",
           "AND(>($f1, 0), " +
-            "=(EXTRACT_DATE(FLAG(QUARTER), /INT(Reinterpret(CAST(w$start)), 
86400000)), 1))")
+            "=(EXTRACT(FLAG(QUARTER), CAST(w$start)), 1))")
       )
 
     util.verifySql(sql, expected)

http://git-wip-us.apache.org/repos/asf/flink/blob/4816a6e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
index bba1a5b..5bdd9dc 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
@@ -235,6 +235,8 @@ class CalcTest extends TableTestBase {
     util.verifyTable(resultTable, expected)
   }
 
+  // As stated in https://issues.apache.org/jira/browse/CALCITE-1584, Calcite 
planner doesn't
+  // promise to retain field names.
   @Test
   def testSelectFromGroupedTableWithNonTrivialKey(): Unit = {
     val util = batchTestUtil()
@@ -249,10 +251,10 @@ class CalcTest extends TableTestBase {
           unaryNode(
             "DataSetCalc",
             batchTableNode(0),
-            term("select", "a", "c", "UPPER(c) AS k")
+            term("select", "a", "c", "UPPER(c) AS $f2")
           ),
-          term("groupBy", "k"),
-          term("select", "k", "SUM(a) AS TMP_0")
+          term("groupBy", "$f2"),
+          term("select", "$f2", "SUM(a) AS TMP_0")
         ),
         term("select", "TMP_0")
       )
@@ -260,6 +262,8 @@ class CalcTest extends TableTestBase {
     util.verifyTable(resultTable, expected)
   }
 
+  // As stated in https://issues.apache.org/jira/browse/CALCITE-1584, Calcite 
planner doesn't
+  // promise to retain field names.
   @Test
   def testSelectFromGroupedTableWithFunctionKey(): Unit = {
     val util = batchTestUtil()
@@ -274,10 +278,10 @@ class CalcTest extends TableTestBase {
           unaryNode(
             "DataSetCalc",
             batchTableNode(0),
-            term("select", "a", "c", "MyHashCode$(c) AS k")
+            term("select", "a", "c", "MyHashCode$(c) AS $f2")
           ),
-          term("groupBy", "k"),
-          term("select", "k", "SUM(a) AS TMP_0")
+          term("groupBy", "$f2"),
+          term("select", "$f2", "SUM(a) AS TMP_0")
         ),
         term("select", "TMP_0")
       )

http://git-wip-us.apache.org/repos/asf/flink/blob/4816a6e7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
index e49a63f..d7d5f1e 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
@@ -201,7 +201,7 @@ class GroupWindowTest extends TableTestBase {
         term("select", "EXPR$0", "w$start AS EXPR$1"),
         term("where",
           "AND(>($f1, 0), " +
-            "=(EXTRACT_DATE(FLAG(QUARTER), /INT(Reinterpret(w$start), 
86400000)), 1))")
+            "=(EXTRACT(FLAG(QUARTER), w$start), 1))")
       )
 
     streamUtil.verifySql(sql, expected)

Reply via email to