[FLINK-6429] [table] Bump Calcite version to 1.13.

This closes #4373.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5770fe8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5770fe8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5770fe8

Branch: refs/heads/master
Commit: d5770fe8dd1486d457c87c17a7df8dba276e9bcd
Parents: f471888
Author: Haohui Mai <whe...@apache.org>
Authored: Wed Jul 19 14:34:37 2017 -0700
Committer: twalthr <twal...@apache.org>
Committed: Tue Aug 8 12:56:34 2017 +0200

----------------------------------------------------------------------
 flink-libraries/flink-table/pom.xml             |    2 +-
 .../calcite/avatica/util/DateTimeUtils.java     | 1044 ++++
 .../apache/calcite/rel/rules/PushProjector.java |  868 +++
 .../calcite/sql/fun/SqlGroupFunction.java       |  103 -
 .../calcite/sql/fun/SqlStdOperatorTable.java    | 2133 -------
 .../apache/calcite/sql/validate/AggChecker.java |  225 -
 .../sql/validate/SqlUserDefinedAggFunction.java |   82 -
 .../calcite/sql2rel/SqlToRelConverter.java      | 5356 ------------------
 .../flink/table/calcite/FlinkTypeFactory.scala  |    2 +-
 .../flink/table/calcite/FlinkTypeSystem.scala   |    2 +-
 .../table/catalog/ExternalCatalogSchema.scala   |    2 +
 .../flink/table/codegen/CodeGenerator.scala     |    3 +
 .../apache/flink/table/expressions/call.scala   |    1 +
 .../apache/flink/table/expressions/time.scala   |   44 +-
 .../table/functions/utils/AggSqlFunction.scala  |    1 +
 .../flink/table/plan/rules/FlinkRuleSets.scala  |    4 +-
 .../flink/table/plan/stats/FlinkStatistic.scala |    5 +-
 .../table/plan/util/RexProgramExtractor.scala   |    3 +
 .../table/api/batch/sql/CorrelateTest.scala     |   24 +-
 .../table/api/batch/table/CorrelateTest.scala   |    6 +-
 .../table/api/stream/sql/CorrelateTest.scala    |   24 +-
 .../table/api/stream/table/CorrelateTest.scala  |   24 +-
 .../table/expressions/ScalarFunctionsTest.scala |    5 +-
 .../plan/TimeIndicatorConversionTest.scala      |    2 +-
 24 files changed, 2002 insertions(+), 7963 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml 
b/flink-libraries/flink-table/pom.xml
index 8a7e3ac..0e943ad 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -52,7 +52,7 @@ under the License.
                <dependency>
                        <groupId>org.apache.calcite</groupId>
                        <artifactId>calcite-core</artifactId>
-                       <version>1.12.0</version>
+                       <version>1.13.0</version>
                        <exclusions>
                                <exclusion>
                                        
<groupId>org.apache.calcite.avatica</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
new file mode 100644
index 0000000..d1a87a7
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
@@ -0,0 +1,1044 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.util;
+
+import java.text.DateFormat;
+import java.text.NumberFormat;
+import java.text.ParsePosition;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Locale;
+import java.util.TimeZone;
+
+/*
+ * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT UNTIL 
CALCITE-1884 IS FIXED.
+ */
+
+/**
+ * Utility functions for datetime types: date, time, timestamp.
+ *
+ * <p>Used by the JDBC driver.
+ *
+ * <p>TODO: review methods for performance. Due to allocations required, it may
+ * be preferable to introduce a "formatter" with the required state.
+ */
+public class DateTimeUtils {
+       /** The julian date of the epoch, 1970-01-01. */
+       public static final int EPOCH_JULIAN = 2440588;
+
+       private DateTimeUtils() {}
+
+       //~ Static fields/initializers 
---------------------------------------------
+
+       /** The SimpleDateFormat string for ISO dates, "yyyy-MM-dd". */
+       public static final String DATE_FORMAT_STRING = "yyyy-MM-dd";
+
+       /** The SimpleDateFormat string for ISO times, "HH:mm:ss". */
+       public static final String TIME_FORMAT_STRING = "HH:mm:ss";
+
+       /** The SimpleDateFormat string for ISO timestamps, "yyyy-MM-dd 
HH:mm:ss". */
+       public static final String TIMESTAMP_FORMAT_STRING =
+               DATE_FORMAT_STRING + " " + TIME_FORMAT_STRING;
+
+       /** The GMT time zone.
+        *
+        * @deprecated Use {@link #UTC_ZONE} */
+       @Deprecated // to be removed before 2.0
+       public static final TimeZone GMT_ZONE = TimeZone.getTimeZone("GMT");
+
+       /** The UTC time zone. */
+       public static final TimeZone UTC_ZONE = TimeZone.getTimeZone("UTC");
+
+       /** The Java default time zone. */
+       public static final TimeZone DEFAULT_ZONE = TimeZone.getDefault();
+
+       /**
+        * The number of milliseconds in a second.
+        */
+       public static final long MILLIS_PER_SECOND = 1000L;
+
+       /**
+        * The number of milliseconds in a minute.
+        */
+       public static final long MILLIS_PER_MINUTE = 60000L;
+
+       /**
+        * The number of milliseconds in an hour.
+        */
+       public static final long MILLIS_PER_HOUR = 3600000L; // = 60 * 60 * 1000
+
+       /**
+        * The number of milliseconds in a day.
+        *
+        * <p>This is the modulo 'mask' used when converting
+        * TIMESTAMP values to DATE and TIME values.
+        */
+       public static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 
1000
+
+       /**
+        * Calendar set to the epoch (1970-01-01 00:00:00 UTC). Useful for
+        * initializing other values. Calendars are not immutable, so be 
careful not
+        * to screw up this object for everyone else.
+        */
+       public static final Calendar ZERO_CALENDAR;
+
+       static {
+               ZERO_CALENDAR = Calendar.getInstance(DateTimeUtils.UTC_ZONE, 
Locale.ROOT);
+               ZERO_CALENDAR.setTimeInMillis(0);
+       }
+
+       //~ Methods 
----------------------------------------------------------------
+
+       /**
+        * Parses a string using {@link SimpleDateFormat} and a given pattern. 
This
+        * method parses a string at the specified parse position and if 
successful,
+        * updates the parse position to the index after the last character 
used.
+        * The parsing is strict and requires months to be less than 12, days 
to be
+        * less than 31, etc.
+        *
+        * @param s       string to be parsed
+        * @param dateFormat Date format
+        * @param tz      time zone in which to interpret string. Defaults to 
the Java
+        *                default time zone
+        * @param pp      position to start parsing from
+        * @return a Calendar initialized with the parsed value, or null if 
parsing
+        * failed. If returned, the Calendar is configured to the GMT time zone.
+        */
+       private static Calendar parseDateFormat(String s, DateFormat dateFormat,
+                                                                               
        TimeZone tz, ParsePosition pp) {
+               if (tz == null) {
+                       tz = DEFAULT_ZONE;
+               }
+               Calendar ret = Calendar.getInstance(tz, Locale.ROOT);
+               dateFormat.setCalendar(ret);
+               dateFormat.setLenient(false);
+
+               final Date d = dateFormat.parse(s, pp);
+               if (null == d) {
+                       return null;
+               }
+               ret.setTime(d);
+               ret.setTimeZone(UTC_ZONE);
+               return ret;
+       }
+
+       @Deprecated // to be removed before 2.0
+       public static Calendar parseDateFormat(String s, String pattern,
+                                                                               
   TimeZone tz) {
+               return parseDateFormat(s, new SimpleDateFormat(pattern, 
Locale.ROOT), tz);
+       }
+
+       /**
+        * Parses a string using {@link SimpleDateFormat} and a given pattern. 
The
+        * entire string must match the pattern specified.
+        *
+        * @param s       string to be parsed
+        * @param dateFormat Date format
+        * @param tz      time zone in which to interpret string. Defaults to 
the Java
+        *                default time zone
+        * @return a Calendar initialized with the parsed value, or null if 
parsing
+        * failed. If returned, the Calendar is configured to the UTC time zone.
+        */
+       public static Calendar parseDateFormat(String s, DateFormat dateFormat,
+                                                                               
   TimeZone tz) {
+               ParsePosition pp = new ParsePosition(0);
+               Calendar ret = parseDateFormat(s, dateFormat, tz, pp);
+               if (pp.getIndex() != s.length()) {
+                       // Didn't consume entire string - not good
+                       return null;
+               }
+               return ret;
+       }
+
+       @Deprecated // to be removed before 2.0
+       public static PrecisionTime parsePrecisionDateTimeLiteral(
+               String s,
+               String pattern,
+               TimeZone tz) {
+               assert pattern != null;
+               return parsePrecisionDateTimeLiteral(s,
+                       new SimpleDateFormat(pattern, Locale.ROOT), tz, 3);
+       }
+
+       /**
+        * Parses a string using {@link SimpleDateFormat} and a given pattern, 
and
+        * if present, parses a fractional seconds component. The fractional 
seconds
+        * component must begin with a decimal point ('.') followed by numeric
+        * digits. The precision is rounded to a maximum of 3 digits of 
fractional
+        * seconds precision (to obtain milliseconds).
+        *
+        * @param s       string to be parsed
+        * @param dateFormat Date format
+        * @param tz      time zone in which to interpret string. Defaults to 
the
+        *                local time zone
+        * @return a {@link DateTimeUtils.PrecisionTime PrecisionTime} 
initialized
+        * with the parsed value, or null if parsing failed. The PrecisionTime
+        * contains a GMT Calendar and a precision.
+        */
+       public static PrecisionTime parsePrecisionDateTimeLiteral(String s,
+                                                                               
                                          DateFormat dateFormat, TimeZone tz, 
int maxPrecision) {
+               final ParsePosition pp = new ParsePosition(0);
+               final Calendar cal = parseDateFormat(s, dateFormat, tz, pp);
+               if (cal == null) {
+                       return null; // Invalid date/time format
+               }
+
+               // Note: the Java SimpleDateFormat 'S' treats any number after
+               // the decimal as milliseconds. That means 12:00:00.9 has 9
+               // milliseconds and 12:00:00.9999 has 9999 milliseconds.
+               int p = 0;
+               String secFraction = "";
+               if (pp.getIndex() < s.length()) {
+                       // Check to see if rest is decimal portion
+                       if (s.charAt(pp.getIndex()) != '.') {
+                               return null;
+                       }
+
+                       // Skip decimal sign
+                       pp.setIndex(pp.getIndex() + 1);
+
+                       // Parse decimal portion
+                       if (pp.getIndex() < s.length()) {
+                               secFraction = s.substring(pp.getIndex());
+                               if (!secFraction.matches("\\d+")) {
+                                       return null;
+                               }
+                               NumberFormat nf = 
NumberFormat.getIntegerInstance(Locale.ROOT);
+                               Number num = nf.parse(s, pp);
+                               if ((num == null) || (pp.getIndex() != 
s.length())) {
+                                       // Invalid decimal portion
+                                       return null;
+                               }
+
+                               // Determine precision - only support prec 3 or 
lower
+                               // (milliseconds) Higher precisions are quietly 
rounded away
+                               p = secFraction.length();
+                               if (maxPrecision >= 0) {
+                                       // If there is a maximum precision, 
ignore subsequent digits
+                                       p = Math.min(maxPrecision, p);
+                                       secFraction = secFraction.substring(0, 
p);
+                               }
+
+                               // Calculate milliseconds
+                               String millis = secFraction;
+                               if (millis.length() > 3) {
+                                       millis = secFraction.substring(0, 3);
+                               }
+                               while (millis.length() < 3) {
+                                       millis = millis + "0";
+                               }
+
+                               int ms = Integer.valueOf(millis);
+                               cal.add(Calendar.MILLISECOND, ms);
+                       }
+               }
+
+               assert pp.getIndex() == s.length();
+               return new PrecisionTime(cal, secFraction, p);
+       }
+
+       /**
+        * Gets the active time zone based on a Calendar argument
+        */
+       public static TimeZone getTimeZone(Calendar cal) {
+               if (cal == null) {
+                       return DEFAULT_ZONE;
+               }
+               return cal.getTimeZone();
+       }
+
+       /**
+        * Checks if the date/time format is valid
+        *
+        * @param pattern {@link SimpleDateFormat}  pattern
+        * @throws IllegalArgumentException if the given pattern is invalid
+        */
+       public static void checkDateFormat(String pattern) {
+               new SimpleDateFormat(pattern, Locale.ROOT);
+       }
+
+       /**
+        * Creates a new date formatter with Farrago specific options. Farrago
+        * parsing is strict and does not allow values such as day 0, month 13, 
etc.
+        *
+        * @param format {@link SimpleDateFormat}  pattern
+        */
+       public static SimpleDateFormat newDateFormat(String format) {
+               SimpleDateFormat sdf = new SimpleDateFormat(format, 
Locale.ROOT);
+               sdf.setLenient(false);
+               return sdf;
+       }
+
+       /** Helper for CAST({timestamp} AS VARCHAR(n)). */
+       public static String unixTimestampToString(long timestamp) {
+               return unixTimestampToString(timestamp, 0);
+       }
+
+       public static String unixTimestampToString(long timestamp, int 
precision) {
+               final StringBuilder buf = new StringBuilder(17);
+               int date = (int) (timestamp / MILLIS_PER_DAY);
+               int time = (int) (timestamp % MILLIS_PER_DAY);
+               if (time < 0) {
+                       --date;
+                       time += MILLIS_PER_DAY;
+               }
+               unixDateToString(buf, date);
+               buf.append(' ');
+               unixTimeToString(buf, time, precision);
+               return buf.toString();
+       }
+
+       /** Helper for CAST({timestamp} AS VARCHAR(n)). */
+       public static String unixTimeToString(int time) {
+               return unixTimeToString(time, 0);
+       }
+
+       public static String unixTimeToString(int time, int precision) {
+               final StringBuilder buf = new StringBuilder(8);
+               unixTimeToString(buf, time, precision);
+               return buf.toString();
+       }
+
+       private static void unixTimeToString(StringBuilder buf, int time,
+                                                                               
 int precision) {
+               int h = time / 3600000;
+               int time2 = time % 3600000;
+               int m = time2 / 60000;
+               int time3 = time2 % 60000;
+               int s = time3 / 1000;
+               int ms = time3 % 1000;
+               int2(buf, h);
+               buf.append(':');
+               int2(buf, m);
+               buf.append(':');
+               int2(buf, s);
+               if (precision > 0) {
+                       buf.append('.');
+                       while (precision > 0) {
+                               buf.append((char) ('0' + (ms / 100)));
+                               ms = ms % 100;
+                               ms = ms * 10;
+                               --precision;
+                       }
+               }
+       }
+
+       private static void int2(StringBuilder buf, int i) {
+               buf.append((char) ('0' + (i / 10) % 10));
+               buf.append((char) ('0' + i % 10));
+       }
+
+       private static void int4(StringBuilder buf, int i) {
+               buf.append((char) ('0' + (i / 1000) % 10));
+               buf.append((char) ('0' + (i / 100) % 10));
+               buf.append((char) ('0' + (i / 10) % 10));
+               buf.append((char) ('0' + i % 10));
+       }
+
+       /** Helper for CAST({date} AS VARCHAR(n)). */
+       public static String unixDateToString(int date) {
+               final StringBuilder buf = new StringBuilder(10);
+               unixDateToString(buf, date);
+               return buf.toString();
+       }
+
+       private static void unixDateToString(StringBuilder buf, int date) {
+               julianToString(buf, date + EPOCH_JULIAN);
+       }
+
+       private static void julianToString(StringBuilder buf, int julian) {
+               // Algorithm the book "Astronomical Algorithms" by Jean Meeus, 
1998
+               int b, c;
+               if (julian > 2299160) {
+                       int a = julian + 32044;
+                       b = (4 * a + 3) / 146097;
+                       c = a - b *146097 / 4;
+               } else {
+                       b = 0;
+                       c = julian + 32082;
+               }
+               int d = (4 * c + 3) / 1461;
+               int e = c - (1461 * d) / 4;
+               int m = (5 * e + 2) / 153;
+               int day = e - (153 * m + 2) / 5 + 1;
+               int month = m + 3 - 12 * (m / 10);
+               int year = b * 100 + d - 4800 + (m / 10);
+
+               int4(buf, year);
+               buf.append('-');
+               int2(buf, month);
+               buf.append('-');
+               int2(buf, day);
+       }
+
+       public static String intervalYearMonthToString(int v, TimeUnitRange 
range) {
+               final StringBuilder buf = new StringBuilder();
+               if (v >= 0) {
+                       buf.append('+');
+               } else {
+                       buf.append('-');
+                       v = -v;
+               }
+               final int y;
+               final int m;
+               switch (range) {
+                       case YEAR:
+                               v = roundUp(v, 12);
+                               y = v / 12;
+                               buf.append(y);
+                               break;
+                       case YEAR_TO_MONTH:
+                               y = v / 12;
+                               buf.append(y);
+                               buf.append('-');
+                               m = v % 12;
+                               number(buf, m, 2);
+                               break;
+                       case MONTH:
+                               m = v;
+                               buf.append(m);
+                               break;
+                       default:
+                               throw new AssertionError(range);
+               }
+               return buf.toString();
+       }
+
+       public static StringBuilder number(StringBuilder buf, int v, int n) {
+               for (int k = digitCount(v); k < n; k++) {
+                       buf.append('0');
+               }
+               return buf.append(v);
+       }
+
+       public static int digitCount(int v) {
+               for (int n = 1;; n++) {
+                       v /= 10;
+                       if (v == 0) {
+                               return n;
+                       }
+               }
+       }
+
+       private static int roundUp(int dividend, int divisor) {
+               int remainder = dividend % divisor;
+               dividend -= remainder;
+               if (remainder * 2 > divisor) {
+                       dividend += divisor;
+               }
+               return dividend;
+       }
+
+       /** Cheap, unsafe, long power. power(2, 3) returns 8. */
+       public static long powerX(long a, long b) {
+               long x = 1;
+               while (b > 0) {
+                       x *= a;
+                       --b;
+               }
+               return x;
+       }
+
+       public static String intervalDayTimeToString(long v, TimeUnitRange 
range,
+                                                                               
                 int scale) {
+               final StringBuilder buf = new StringBuilder();
+               if (v >= 0) {
+                       buf.append('+');
+               } else {
+                       buf.append('-');
+                       v = -v;
+               }
+               final long ms;
+               final long s;
+               final long m;
+               final long h;
+               final long d;
+               switch (range) {
+                       case DAY_TO_SECOND:
+                               v = roundUp(v, powerX(10, 3 - scale));
+                               ms = v % 1000;
+                               v /= 1000;
+                               s = v % 60;
+                               v /= 60;
+                               m = v % 60;
+                               v /= 60;
+                               h = v % 24;
+                               v /= 24;
+                               d = v;
+                               buf.append((int) d);
+                               buf.append(' ');
+                               number(buf, (int) h, 2);
+                               buf.append(':');
+                               number(buf, (int) m, 2);
+                               buf.append(':');
+                               number(buf, (int) s, 2);
+                               fraction(buf, scale, ms);
+                               break;
+                       case DAY_TO_MINUTE:
+                               v = roundUp(v, 1000 * 60);
+                               v /= 1000;
+                               v /= 60;
+                               m = v % 60;
+                               v /= 60;
+                               h = v % 24;
+                               v /= 24;
+                               d = v;
+                               buf.append((int) d);
+                               buf.append(' ');
+                               number(buf, (int) h, 2);
+                               buf.append(':');
+                               number(buf, (int) m, 2);
+                               break;
+                       case DAY_TO_HOUR:
+                               v = roundUp(v, 1000 * 60 * 60);
+                               v /= 1000;
+                               v /= 60;
+                               v /= 60;
+                               h = v % 24;
+                               v /= 24;
+                               d = v;
+                               buf.append((int) d);
+                               buf.append(' ');
+                               number(buf, (int) h, 2);
+                               break;
+                       case DAY:
+                               v = roundUp(v, 1000 * 60 * 60 * 24);
+                               d = v / (1000 * 60 * 60 * 24);
+                               buf.append((int) d);
+                               break;
+                       case HOUR:
+                               v = roundUp(v, 1000 * 60 * 60);
+                               v /= 1000;
+                               v /= 60;
+                               v /= 60;
+                               h = v;
+                               buf.append((int) h);
+                               break;
+                       case HOUR_TO_MINUTE:
+                               v = roundUp(v, 1000 * 60);
+                               v /= 1000;
+                               v /= 60;
+                               m = v % 60;
+                               v /= 60;
+                               h = v;
+                               buf.append((int) h);
+                               buf.append(':');
+                               number(buf, (int) m, 2);
+                               break;
+                       case HOUR_TO_SECOND:
+                               v = roundUp(v, powerX(10, 3 - scale));
+                               ms = v % 1000;
+                               v /= 1000;
+                               s = v % 60;
+                               v /= 60;
+                               m = v % 60;
+                               v /= 60;
+                               h = v;
+                               buf.append((int) h);
+                               buf.append(':');
+                               number(buf, (int) m, 2);
+                               buf.append(':');
+                               number(buf, (int) s, 2);
+                               fraction(buf, scale, ms);
+                               break;
+                       case MINUTE_TO_SECOND:
+                               v = roundUp(v, powerX(10, 3 - scale));
+                               ms = v % 1000;
+                               v /= 1000;
+                               s = v % 60;
+                               v /= 60;
+                               m = v;
+                               buf.append((int) m);
+                               buf.append(':');
+                               number(buf, (int) s, 2);
+                               fraction(buf, scale, ms);
+                               break;
+                       case MINUTE:
+                               v = roundUp(v, 1000 * 60);
+                               v /= 1000;
+                               v /= 60;
+                               m = v;
+                               buf.append((int) m);
+                               break;
+                       case SECOND:
+                               v = roundUp(v, powerX(10, 3 - scale));
+                               ms = v % 1000;
+                               v /= 1000;
+                               s = v;
+                               buf.append((int) s);
+                               fraction(buf, scale, ms);
+                               break;
+                       default:
+                               throw new AssertionError(range);
+               }
+               return buf.toString();
+       }
+
+       /**
+        * Rounds a dividend to the nearest divisor.
+        * For example roundUp(31, 10) yields 30; roundUp(37, 10) yields 40.
+        * @param dividend Number to be divided
+        * @param divisor Number to divide by
+        * @return Rounded dividend
+        */
+       private static long roundUp(long dividend, long divisor) {
+               long remainder = dividend % divisor;
+               dividend -= remainder;
+               if (remainder * 2 > divisor) {
+                       dividend += divisor;
+               }
+               return dividend;
+       }
+
+       private static void fraction(StringBuilder buf, int scale, long ms) {
+               if (scale > 0) {
+                       buf.append('.');
+                       long v1 = scale == 3 ? ms
+                               : scale == 2 ? ms / 10
+                               : scale == 1 ? ms / 100
+                               : 0;
+                       number(buf, (int) v1, scale);
+               }
+       }
+
+       public static int dateStringToUnixDate(String s) {
+               int hyphen1 = s.indexOf('-');
+               int y;
+               int m;
+               int d;
+               if (hyphen1 < 0) {
+                       y = Integer.parseInt(s.trim());
+                       m = 1;
+                       d = 1;
+               } else {
+                       y = Integer.parseInt(s.substring(0, hyphen1).trim());
+                       final int hyphen2 = s.indexOf('-', hyphen1 + 1);
+                       if (hyphen2 < 0) {
+                               m = Integer.parseInt(s.substring(hyphen1 + 
1).trim());
+                               d = 1;
+                       } else {
+                               m = Integer.parseInt(s.substring(hyphen1 + 1, 
hyphen2).trim());
+                               d = Integer.parseInt(s.substring(hyphen2 + 
1).trim());
+                       }
+               }
+               return ymdToUnixDate(y, m, d);
+       }
+
+       public static int timeStringToUnixDate(String v) {
+               return timeStringToUnixDate(v, 0);
+       }
+
+       public static int timeStringToUnixDate(String v, int start) {
+               final int colon1 = v.indexOf(':', start);
+               int hour;
+               int minute;
+               int second;
+               int milli;
+               if (colon1 < 0) {
+                       hour = Integer.parseInt(v.trim());
+                       minute = 1;
+                       second = 1;
+                       milli = 0;
+               } else {
+                       hour = Integer.parseInt(v.substring(start, 
colon1).trim());
+                       final int colon2 = v.indexOf(':', colon1 + 1);
+                       if (colon2 < 0) {
+                               minute = Integer.parseInt(v.substring(colon1 + 
1).trim());
+                               second = 1;
+                               milli = 0;
+                       } else {
+                               minute = Integer.parseInt(v.substring(colon1 + 
1, colon2).trim());
+                               int dot = v.indexOf('.', colon2);
+                               if (dot < 0) {
+                                       second = 
Integer.parseInt(v.substring(colon2 + 1).trim());
+                                       milli = 0;
+                               } else {
+                                       second = 
Integer.parseInt(v.substring(colon2 + 1, dot).trim());
+                                       milli = parseFraction(v.substring(dot + 
1).trim(), 100);
+                               }
+                       }
+               }
+               return hour * (int) MILLIS_PER_HOUR
+                       + minute * (int) MILLIS_PER_MINUTE
+                       + second * (int) MILLIS_PER_SECOND
+                       + milli;
+       }
+
+       /** Parses a fraction, multiplying the first character by {@code 
multiplier},
+        * the second character by {@code multiplier / 10},
+        * the third character by {@code multiplier / 100}, and so forth.
+        *
+        * <p>For example, {@code parseFraction("1234", 100)} yields {@code 
123}. */
+       private static int parseFraction(String v, int multiplier) {
+               int r = 0;
+               for (int i = 0; i < v.length(); i++) {
+                       char c = v.charAt(i);
+                       int x = c < '0' || c > '9' ? 0 : (c - '0');
+                       r += multiplier * x;
+                       if (multiplier < 10) {
+                               // We're at the last digit. Check for rounding.
+                               if (i + 1 < v.length()
+                                       && v.charAt(i + 1) >= '5') {
+                                       ++r;
+                               }
+                               break;
+                       }
+                       multiplier /= 10;
+               }
+               return r;
+       }
+
+       public static long timestampStringToUnixDate(String s) {
+               final long d;
+               final long t;
+               s = s.trim();
+               int space = s.indexOf(' ');
+               if (space >= 0) {
+                       d = dateStringToUnixDate(s.substring(0, space));
+                       t = timeStringToUnixDate(s, space + 1);
+               } else {
+                       d = dateStringToUnixDate(s);
+                       t = 0;
+               }
+               return d * MILLIS_PER_DAY + t;
+       }
+
+       public static long unixDateExtract(TimeUnitRange range, long date) {
+               return julianExtract(range, (int) date + EPOCH_JULIAN);
+       }
+
+       private static int julianExtract(TimeUnitRange range, int julian) {
+               // Algorithm the book "Astronomical Algorithms" by Jean Meeus, 
1998
+               int b, c;
+               if (julian > 2299160) {
+                       int a = julian + 32044;
+                       b = (4 * a + 3) / 146097;
+                       c = a - b *146097 / 4;
+               } else {
+                       b = 0;
+                       c = julian + 32082;
+               }
+               int d = (4 * c + 3) / 1461;
+               int e = c - (1461 * d) / 4;
+               int m = (5 * e + 2) / 153;
+               int day = e - (153 * m + 2) / 5 + 1;
+               int month = m + 3 - 12 * (m / 10);
+               int year = b * 100 + d - 4800 + (m / 10);
+
+               switch (range) {
+                       case YEAR:
+                               return year;
+                       case QUARTER:
+                               return (month + 2) / 3;
+                       case MONTH:
+                               return month;
+                       case DAY:
+                               return day;
+                       case DOW:
+                               return (int) floorMod(julian + 1, 7) + 1; // 
sun=1, sat=7
+                       case WEEK:
+                               long fmofw = firstMondayOfFirstWeek(year);
+                               if (julian < fmofw) {
+                                       fmofw = firstMondayOfFirstWeek(year - 
1);
+                               }
+                               return (int) (julian - fmofw) / 7 + 1;
+                       case DOY:
+                               final long janFirst = ymdToJulian(year, 1, 1);
+                               return (int) (julian - janFirst) + 1;
+                       case CENTURY:
+                               return year > 0
+                                       ? (year + 99) / 100
+                                       : (year - 99) / 100;
+                       case MILLENNIUM:
+                               return year > 0
+                                       ? (year + 999) / 1000
+                                       : (year - 999) / 1000;
+                       default:
+                               throw new AssertionError(range);
+               }
+       }
+
+       /** Returns the first day of the first week of a year.
+        * Per ISO-8601 it is the Monday of the week that contains Jan 4,
+        * or equivalently, it is a Monday between Dec 29 and Jan 4.
+        * Sometimes it is in the year before the given year. */
+       private static long firstMondayOfFirstWeek(int year) {
+               final long janFirst = ymdToJulian(year, 1, 1);
+               final long janFirstDow = floorMod(janFirst + 1, 7); // sun=0, 
sat=6
+               return janFirst + (11 - janFirstDow) % 7 - 3;
+       }
+
+       /** Extracts a time unit from a UNIX date (milliseconds since epoch). */
+       public static int unixTimestampExtract(TimeUnitRange range,
+                                                                               
   long timestamp) {
+               return unixTimeExtract(range, (int) floorMod(timestamp, 
MILLIS_PER_DAY));
+       }
+
+       /** Extracts a time unit from a time value (milliseconds since 
midnight). */
+       public static int unixTimeExtract(TimeUnitRange range, int time) {
+               assert time >= 0;
+               assert time < MILLIS_PER_DAY;
+               switch (range) {
+                       case HOUR:
+                               return time / (int) MILLIS_PER_HOUR;
+                       case MINUTE:
+                               final int minutes = time / (int) 
MILLIS_PER_MINUTE;
+                               return minutes % 60;
+                       case SECOND:
+                               final int seconds = time / (int) 
MILLIS_PER_SECOND;
+                               return seconds % 60;
+                       default:
+                               throw new AssertionError(range);
+               }
+       }
+
+       /** Resets to zero the "time" part of a timestamp. */
+       public static long resetTime(long timestamp) {
+               int date = (int) (timestamp / MILLIS_PER_DAY);
+               return (long) date * MILLIS_PER_DAY;
+       }
+
+       /** Resets to epoch (1970-01-01) the "date" part of a timestamp. */
+       public static long resetDate(long timestamp) {
+               return floorMod(timestamp, MILLIS_PER_DAY);
+       }
+
+       public static long unixTimestampFloor(TimeUnitRange range, long 
timestamp) {
+               int date = (int) (timestamp / MILLIS_PER_DAY);
+               final int f = julianDateFloor(range, date + EPOCH_JULIAN, true);
+               return (long) f * MILLIS_PER_DAY;
+       }
+
+       public static long unixDateFloor(TimeUnitRange range, long date) {
+               return julianDateFloor(range, (int) date + EPOCH_JULIAN, true);
+       }
+
+       public static long unixTimestampCeil(TimeUnitRange range, long 
timestamp) {
+               int date = (int) (timestamp / MILLIS_PER_DAY);
+               final int f = julianDateFloor(range, date + EPOCH_JULIAN, 
false);
+               return (long) f * MILLIS_PER_DAY;
+       }
+
+       public static long unixDateCeil(TimeUnitRange range, long date) {
+               return julianDateFloor(range, (int) date + EPOCH_JULIAN, true);
+       }
+
+       private static int julianDateFloor(TimeUnitRange range, int julian,
+                                                                          
boolean floor) {
+               // Algorithm the book "Astronomical Algorithms" by Jean Meeus, 
1998
+               int b, c;
+               if (julian > 2299160) {
+                       int a = julian + 32044;
+                       b = (4 * a + 3) / 146097;
+                       c = a - b *146097 / 4;
+               } else {
+                       b = 0;
+                       c = julian + 32082;
+               }
+               int d = (4 * c + 3) / 1461;
+               int e = c - (1461 * d) / 4;
+               int m = (5 * e + 2) / 153;
+               int day = e - (153 * m + 2) / 5 + 1;
+               int month = m + 3 - 12 * (m / 10);
+               int year = b * 100 + d - 4800 + (m / 10);
+
+               switch (range) {
+                       case YEAR:
+                               if (!floor && (month > 1 || day > 1)) {
+                                       ++year;
+                               }
+                               return ymdToUnixDate(year, 1, 1);
+                       case MONTH:
+                               if (!floor && day > 1) {
+                                       ++month;
+                               }
+                               return ymdToUnixDate(year, month, 1);
+                       default:
+                               throw new AssertionError(range);
+               }
+       }
+
+       public static int ymdToUnixDate(int year, int month, int day) {
+               final int julian = ymdToJulian(year, month, day);
+               return julian - EPOCH_JULIAN;
+       }
+
+       public static int ymdToJulian(int year, int month, int day) {
+               int a = (14 - month) / 12;
+               int y = year + 4800 - a;
+               int m = month + 12 * a - 3;
+               int j = day + (153 * m + 2) / 5
+                       + 365 * y
+                       + y / 4
+                       - y / 100
+                       + y / 400
+                       - 32045;
+               if (j < 2299161) {
+                       j = day + (153 * m + 2) / 5 + 365 * y + y / 4 - 32083;
+               }
+               return j;
+       }
+
+       public static long unixTimestamp(int year, int month, int day, int hour,
+                                                                        int 
minute, int second) {
+               final int date = ymdToUnixDate(year, month, day);
+               return (long) date * MILLIS_PER_DAY
+                       + (long) hour * MILLIS_PER_HOUR
+                       + (long) minute * MILLIS_PER_MINUTE
+                       + (long) second * MILLIS_PER_SECOND;
+       }
+
+       /** Adds a given number of months to a timestamp, represented as the 
number
+        * of milliseconds since the epoch. */
+       public static long addMonths(long timestamp, int m) {
+               final long millis =
+                       DateTimeUtils.floorMod(timestamp, 
DateTimeUtils.MILLIS_PER_DAY);
+               timestamp -= millis;
+               final long x =
+                       addMonths((int) (timestamp / 
DateTimeUtils.MILLIS_PER_DAY), m);
+               return x * DateTimeUtils.MILLIS_PER_DAY + millis;
+       }
+
+       /** Adds a given number of months to a date, represented as the number 
of
+        * days since the epoch. */
+       public static int addMonths(int date, int m) {
+               int y0 = (int) 
DateTimeUtils.unixDateExtract(TimeUnitRange.YEAR, date);
+               int m0 = (int) 
DateTimeUtils.unixDateExtract(TimeUnitRange.MONTH, date);
+               int d0 = (int) DateTimeUtils.unixDateExtract(TimeUnitRange.DAY, 
date);
+               int y = m / 12;
+               y0 += y;
+               m0 += m - y * 12;
+               int last = lastDay(y0, m0);
+               if (d0 > last) {
+                       d0 = 1;
+                       if (++m0 > 12) {
+                               m0 = 1;
+                               ++y0;
+                       }
+               }
+               return DateTimeUtils.ymdToUnixDate(y0, m0, d0);
+       }
+
+       private static int lastDay(int y, int m) {
+               switch (m) {
+                       case 2:
+                               return y % 4 == 0
+                                       && (y % 100 != 0
+                                       || y % 400 == 0)
+                                       ? 29 : 28;
+                       case 4:
+                       case 6:
+                       case 9:
+                       case 11:
+                               return 30;
+                       default:
+                               return 31;
+               }
+       }
+
+       /** Finds the number of months between two dates, each represented as 
the
+        * number of days since the epoch. */
+       public static int subtractMonths(int date0, int date1) {
+               if (date0 < date1) {
+                       return -subtractMonths(date1, date0);
+               }
+               // Start with an estimate.
+               // Since no month has more than 31 days, the estimate is <= the 
true value.
+               int m = (date0 - date1) / 31;
+               for (;;) {
+                       int date2 = addMonths(date1, m);
+                       if (date2 >= date0) {
+                               return m;
+                       }
+                       int date3 = addMonths(date1, m + 1);
+                       if (date3 > date0) {
+                               return m;
+                       }
+                       ++m;
+               }
+       }
+
+       public static int subtractMonths(long t0, long t1) {
+               final long millis0 =
+                       DateTimeUtils.floorMod(t0, 
DateTimeUtils.MILLIS_PER_DAY);
+               final int d0 = (int) DateTimeUtils.floorDiv(t0 - millis0,
+                       DateTimeUtils.MILLIS_PER_DAY);
+               final long millis1 =
+                       DateTimeUtils.floorMod(t1, 
DateTimeUtils.MILLIS_PER_DAY);
+               final int d1 = (int) DateTimeUtils.floorDiv(t1 - millis1,
+                       DateTimeUtils.MILLIS_PER_DAY);
+               int x = subtractMonths(d0, d1);
+               final long d2 = addMonths(d1, x);
+               if (d2 == d0 && millis0 < millis1) {
+                       --x;
+               }
+               return x;
+       }
+
+       /** Divide, rounding towards negative infinity. */
+       public static long floorDiv(long x, long y) {
+               long r = x / y;
+               // if the signs are different and modulo not zero, round down
+               if ((x ^ y) < 0 && (r * y != x)) {
+                       r--;
+               }
+               return r;
+       }
+
+       /** Modulo, always returning a non-negative result. */
+       public static long floorMod(long x, long y) {
+               return x - floorDiv(x, y) * y;
+       }
+
+       /** Creates an instance of {@link Calendar} in the root locale and UTC 
time
+        * zone. */
+       public static Calendar calendar() {
+               return Calendar.getInstance(UTC_ZONE, Locale.ROOT);
+       }
+
+       //~ Inner Classes 
----------------------------------------------------------
+
+       /**
+        * Helper class for {@link DateTimeUtils#parsePrecisionDateTimeLiteral}
+        */
+       public static class PrecisionTime {
+               private final Calendar cal;
+               private final String fraction;
+               private final int precision;
+
+               public PrecisionTime(Calendar cal, String fraction, int 
precision) {
+                       this.cal = cal;
+                       this.fraction = fraction;
+                       this.precision = precision;
+               }
+
+               public Calendar getCalendar() {
+                       return cal;
+               }
+
+               public int getPrecision() {
+                       return precision;
+               }
+
+               public String getFraction() {
+                       return fraction;
+               }
+       }
+}
+
+// End DateTimeUtils.java

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
new file mode 100644
index 0000000..0955aeb
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
@@ -0,0 +1,868 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.Strong;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.SemiJoin;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.runtime.PredicateImpl;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.BitSets;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Set;
+
+// This class is copied from Apache Calcite except that it does not
+// automatically name the field using the name of the operators
+// as the Table API rejects special characters like '-' in the field names.
+
+/**
+ * PushProjector is a utility class used to perform operations used in push
+ * projection rules.
+ *
+ * <p>Pushing is particularly interesting in the case of join, because there
+ * are multiple inputs. Generally an expression can be pushed down to a
+ * particular input if it depends upon no other inputs. If it can be pushed
+ * down to both sides, it is pushed down to the left.
+ *
+ * <p>Sometimes an expression needs to be split before it can be pushed down.
+ * To flag that an expression cannot be split, specify a rule that it must be
+ * <dfn>preserved</dfn>. Such an expression will be pushed down intact to one
+ * of the inputs, or not pushed down at all.</p>
+ */
+public class PushProjector {
+  //~ Instance fields --------------------------------------------------------
+
+  private final Project origProj;
+  private final RexNode origFilter;
+  private final RelNode childRel;
+  private final ExprCondition preserveExprCondition;
+  private final RelBuilder relBuilder;
+
+  /**
+   * Original projection expressions
+   */
+  final List<RexNode> origProjExprs;
+
+  /**
+   * Fields from the RelNode that the projection is being pushed past
+   */
+  final List<RelDataTypeField> childFields;
+
+  /**
+   * Number of fields in the RelNode that the projection is being pushed past
+   */
+  final int nChildFields;
+
+  /**
+   * Bitmap containing the references in the original projection
+   */
+  final BitSet projRefs;
+
+  /**
+   * Bitmap containing the fields in the RelNode that the projection is being
+   * pushed past, if the RelNode is not a join. If the RelNode is a join, then
+   * the fields correspond to the left hand side of the join.
+   */
+  final ImmutableBitSet childBitmap;
+
+  /**
+   * Bitmap containing the fields in the right hand side of a join, in the
+   * case where the projection is being pushed past a join. Not used
+   * otherwise.
+   */
+  final ImmutableBitSet rightBitmap;
+
+  /**
+   * Bitmap containing the fields that should be strong, i.e. when preserving 
expressions
+   * we can only preserve them if the expressions if it is null when these 
fields are null.
+   */
+  final ImmutableBitSet strongBitmap;
+
+  /**
+   * Number of fields in the RelNode that the projection is being pushed past,
+   * if the RelNode is not a join. If the RelNode is a join, then this is the
+   * number of fields in the left hand side of the join.
+   *
+   * <p>The identity
+   * {@code nChildFields == nSysFields + nFields + nFieldsRight}
+   * holds. {@code nFields} does not include {@code nSysFields}.
+   * The output of a join looks like this:
+   *
+   * <blockquote><pre>
+   * | nSysFields | nFields | nFieldsRight |
+   * </pre></blockquote>
+   *
+   * <p>The output of a single-input rel looks like this:
+   *
+   * <blockquote><pre>
+   * | nSysFields | nFields |
+   * </pre></blockquote>
+   */
+  final int nFields;
+
+  /**
+   * Number of fields in the right hand side of a join, in the case where the
+   * projection is being pushed past a join. Always 0 otherwise.
+   */
+  final int nFieldsRight;
+
+  /**
+   * Number of system fields. System fields appear at the start of a join,
+   * before the first field from the left input.
+   */
+  private final int nSysFields;
+
+  /**
+   * Expressions referenced in the projection/filter that should be preserved.
+   * In the case where the projection is being pushed past a join, then the
+   * list only contains the expressions corresponding to the left hand side of
+   * the join.
+   */
+  final List<RexNode> childPreserveExprs;
+
+  /**
+   * Expressions referenced in the projection/filter that should be preserved,
+   * corresponding to expressions on the right hand side of the join, if the
+   * projection is being pushed past a join. Empty list otherwise.
+   */
+  final List<RexNode> rightPreserveExprs;
+
+  /**
+   * Number of system fields being projected.
+   */
+  int nSystemProject;
+
+  /**
+   * Number of fields being projected. In the case where the projection is
+   * being pushed past a join, the number of fields being projected from the
+   * left hand side of the join.
+   */
+  int nProject;
+
+  /**
+   * Number of fields being projected from the right hand side of a join, in
+   * the case where the projection is being pushed past a join. 0 otherwise.
+   */
+  int nRightProject;
+
+  /**
+   * Rex builder used to create new expressions.
+   */
+  final RexBuilder rexBuilder;
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Creates a PushProjector object for pushing projects past a RelNode.
+   *
+   * @param origProj              the original projection that is being pushed;
+   *                              may be null if the projection is implied as a
+   *                              result of a projection having been trivially
+   *                              removed
+   * @param origFilter            the filter that the projection must also be
+   *                              pushed past, if applicable
+   * @param childRel              the RelNode that the projection is being
+   *                              pushed past
+   * @param preserveExprCondition condition for whether an expression should
+   *                              be preserved in the projection
+   */
+  public PushProjector(
+      Project origProj,
+      RexNode origFilter,
+      RelNode childRel,
+      ExprCondition preserveExprCondition,
+      RelBuilder relBuilder) {
+    this.origProj = origProj;
+    this.origFilter = origFilter;
+    this.childRel = childRel;
+    this.preserveExprCondition = preserveExprCondition;
+    this.relBuilder = Preconditions.checkNotNull(relBuilder);
+    if (origProj == null) {
+      origProjExprs = ImmutableList.of();
+    } else {
+      origProjExprs = origProj.getProjects();
+    }
+
+    childFields = childRel.getRowType().getFieldList();
+    nChildFields = childFields.size();
+
+    projRefs = new BitSet(nChildFields);
+    if (childRel instanceof Join) {
+      Join joinRel = (Join) childRel;
+      List<RelDataTypeField> leftFields =
+          joinRel.getLeft().getRowType().getFieldList();
+      List<RelDataTypeField> rightFields =
+          joinRel.getRight().getRowType().getFieldList();
+      nFields = leftFields.size();
+      nFieldsRight = childRel instanceof SemiJoin ? 0 : rightFields.size();
+      nSysFields = joinRel.getSystemFieldList().size();
+      childBitmap =
+          ImmutableBitSet.range(nSysFields, nFields + nSysFields);
+      rightBitmap =
+          ImmutableBitSet.range(nFields + nSysFields, nChildFields);
+
+      switch (joinRel.getJoinType()) {
+      case INNER:
+        strongBitmap = ImmutableBitSet.of();
+        break;
+      case RIGHT:  // All the left-input's columns must be strong
+        strongBitmap = ImmutableBitSet.range(nSysFields, nFields + nSysFields);
+        break;
+      case LEFT: // All the right-input's columns must be strong
+        strongBitmap = ImmutableBitSet.range(nFields + nSysFields, 
nChildFields);
+        break;
+      case FULL:
+      default:
+        strongBitmap = ImmutableBitSet.range(nSysFields, nChildFields);
+      }
+
+    } else {
+      nFields = nChildFields;
+      nFieldsRight = 0;
+      childBitmap = ImmutableBitSet.range(nChildFields);
+      rightBitmap = null;
+      nSysFields = 0;
+      strongBitmap = ImmutableBitSet.of();
+    }
+    assert nChildFields == nSysFields + nFields + nFieldsRight;
+
+    childPreserveExprs = new ArrayList<RexNode>();
+    rightPreserveExprs = new ArrayList<RexNode>();
+
+    rexBuilder = childRel.getCluster().getRexBuilder();
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  /**
+   * Decomposes a projection to the input references referenced by a
+   * projection and a filter, either of which is optional. If both are
+   * provided, the filter is underneath the project.
+   *
+   * <p>Creates a projection containing all input references as well as
+   * preserving any special expressions. Converts the original projection
+   * and/or filter to reference the new projection. Then, finally puts on top,
+   * a final projection corresponding to the original projection.
+   *
+   * @param defaultExpr expression to be used in the projection if no fields
+   *                    or special columns are selected
+   * @return the converted projection if it makes sense to push elements of
+   * the projection; otherwise returns null
+   */
+  public RelNode convertProject(RexNode defaultExpr) {
+    // locate all fields referenced in the projection and filter
+    locateAllRefs();
+
+    // if all columns are being selected (either explicitly in the
+    // projection) or via a "select *", then there needs to be some
+    // special expressions to preserve in the projection; otherwise,
+    // there's no point in proceeding any further
+    if (origProj == null) {
+      if (childPreserveExprs.size() == 0) {
+        return null;
+      }
+
+      // even though there is no projection, this is the same as
+      // selecting all fields
+      if (nChildFields > 0) {
+        // Calling with nChildFields == 0 should be safe but hits
+        // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6222207
+        projRefs.set(0, nChildFields);
+      }
+      nProject = nChildFields;
+    } else if (
+        (projRefs.cardinality() == nChildFields)
+            && (childPreserveExprs.size() == 0)) {
+      return null;
+    }
+
+    // if nothing is being selected from the underlying rel, just
+    // project the default expression passed in as a parameter or the
+    // first column if there is no default expression
+    if ((projRefs.cardinality() == 0) && (childPreserveExprs.size() == 0)) {
+      if (defaultExpr != null) {
+        childPreserveExprs.add(defaultExpr);
+      } else if (nChildFields == 1) {
+        return null;
+      } else {
+        projRefs.set(0);
+        nProject = 1;
+      }
+    }
+
+    // create a new projection referencing all fields referenced in
+    // either the project or the filter
+    RelNode newProject = createProjectRefsAndExprs(childRel, false, false);
+
+    int[] adjustments = getAdjustments();
+
+    // if a filter was passed in, convert it to reference the projected
+    // columns, placing it on top of the project just created
+    RelNode projChild;
+    if (origFilter != null) {
+      RexNode newFilter =
+          convertRefsAndExprs(
+              origFilter,
+              newProject.getRowType().getFieldList(),
+              adjustments);
+      relBuilder.push(newProject);
+      relBuilder.filter(newFilter);
+      projChild = relBuilder.build();
+    } else {
+      projChild = newProject;
+    }
+
+    // put the original project on top of the filter/project, converting
+    // it to reference the modified projection list; otherwise, create
+    // a projection that essentially selects all fields
+    return createNewProject(projChild, adjustments);
+  }
+
+  /**
+   * Locates all references found in either the projection expressions a
+   * filter, as well as references to expressions that should be preserved.
+   * Based on that, determines whether pushing the projection makes sense.
+   *
+   * @return true if all inputs from the child that the projection is being
+   * pushed past are referenced in the projection/filter and no special
+   * preserve expressions are referenced; in that case, it does not make sense
+   * to push the projection
+   */
+  public boolean locateAllRefs() {
+    RexUtil.apply(
+        new InputSpecialOpFinder(
+            projRefs,
+            childBitmap,
+            rightBitmap,
+            strongBitmap,
+            preserveExprCondition,
+            childPreserveExprs,
+            rightPreserveExprs),
+        origProjExprs,
+        origFilter);
+
+    // The system fields of each child are always used by the join, even if
+    // they are not projected out of it.
+    projRefs.set(
+        nSysFields,
+        nSysFields + nSysFields,
+        true);
+    projRefs.set(
+        nSysFields + nFields,
+        nSysFields + nFields + nSysFields,
+        true);
+
+    // Count how many fields are projected.
+    nSystemProject = 0;
+    nProject = 0;
+    nRightProject = 0;
+    for (int bit : BitSets.toIter(projRefs)) {
+      if (bit < nSysFields) {
+        nSystemProject++;
+      } else if (bit < nSysFields + nFields) {
+        nProject++;
+      } else {
+        nRightProject++;
+      }
+    }
+
+    assert nSystemProject + nProject + nRightProject
+        == projRefs.cardinality();
+
+    if ((childRel instanceof Join)
+        || (childRel instanceof SetOp)) {
+      // if nothing is projected from the children, arbitrarily project
+      // the first columns; this is necessary since Fennel doesn't
+      // handle 0-column projections
+      if ((nProject == 0) && (childPreserveExprs.size() == 0)) {
+        projRefs.set(0);
+        nProject = 1;
+      }
+      if (childRel instanceof Join) {
+        if ((nRightProject == 0) && (rightPreserveExprs.size() == 0)) {
+          projRefs.set(nFields);
+          nRightProject = 1;
+        }
+      }
+    }
+
+    // no need to push projections if all children fields are being
+    // referenced and there are no special preserve expressions; note
+    // that we need to do this check after we've handled the 0-column
+    // project cases
+    if (projRefs.cardinality() == nChildFields
+        && childPreserveExprs.size() == 0
+        && rightPreserveExprs.size() == 0) {
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Creates a projection based on the inputs specified in a bitmap and the
+   * expressions that need to be preserved. The expressions are appended after
+   * the input references.
+   *
+   * @param projChild child that the projection will be created on top of
+   * @param adjust    if true, need to create new projection expressions;
+   *                  otherwise, the existing ones are reused
+   * @param rightSide if true, creating a projection for the right hand side
+   *                  of a join
+   * @return created projection
+   */
+  public Project createProjectRefsAndExprs(
+      RelNode projChild,
+      boolean adjust,
+      boolean rightSide) {
+    List<RexNode> preserveExprs;
+    int nInputRefs;
+    int offset;
+
+    if (rightSide) {
+      preserveExprs = rightPreserveExprs;
+      nInputRefs = nRightProject;
+      offset = nSysFields + nFields;
+    } else {
+      preserveExprs = childPreserveExprs;
+      nInputRefs = nProject;
+      offset = nSysFields;
+    }
+    int refIdx = offset - 1;
+    List<Pair<RexNode, String>> newProjects =
+        new ArrayList<Pair<RexNode, String>>();
+    List<RelDataTypeField> destFields =
+        projChild.getRowType().getFieldList();
+
+    // add on the input references
+    for (int i = 0; i < nInputRefs; i++) {
+      refIdx = projRefs.nextSetBit(refIdx + 1);
+      assert refIdx >= 0;
+      final RelDataTypeField destField = destFields.get(refIdx - offset);
+      newProjects.add(
+          Pair.of(
+              (RexNode) rexBuilder.makeInputRef(
+                  destField.getType(), refIdx - offset),
+              destField.getName()));
+    }
+
+    // add on the expressions that need to be preserved, converting the
+    // arguments to reference the projected columns (if necessary)
+    int[] adjustments = {};
+    if ((preserveExprs.size() > 0) && adjust) {
+      adjustments = new int[childFields.size()];
+      for (int idx = offset; idx < childFields.size(); idx++) {
+        adjustments[idx] = -offset;
+      }
+    }
+    for (RexNode projExpr : preserveExprs) {
+      RexNode newExpr;
+      if (adjust) {
+        newExpr =
+            projExpr.accept(
+                new RelOptUtil.RexInputConverter(
+                    rexBuilder,
+                    childFields,
+                    destFields,
+                    adjustments));
+      } else {
+        newExpr = projExpr;
+      }
+      newProjects.add(
+                Pair.of(
+              newExpr,
+              null));
+    }
+
+    return (Project) RelOptUtil.createProject(
+        projChild,
+        Pair.left(newProjects),
+        Pair.right(newProjects),
+        false,
+        relBuilder);
+  }
+
+  /**
+   * Determines how much each input reference needs to be adjusted as a result
+   * of projection
+   *
+   * @return array indicating how much each input needs to be adjusted by
+   */
+  public int[] getAdjustments() {
+    int[] adjustments = new int[nChildFields];
+    int newIdx = 0;
+    int rightOffset = childPreserveExprs.size();
+    for (int pos : BitSets.toIter(projRefs)) {
+      adjustments[pos] = -(pos - newIdx);
+      if (pos >= nSysFields + nFields) {
+        adjustments[pos] += rightOffset;
+      }
+      newIdx++;
+    }
+    return adjustments;
+  }
+
+  /**
+   * Clones an expression tree and walks through it, adjusting each
+   * RexInputRef index by some amount, and converting expressions that need to
+   * be preserved to field references.
+   *
+   * @param rex         the expression
+   * @param destFields  fields that the new expressions will be referencing
+   * @param adjustments the amount each input reference index needs to be
+   *                    adjusted by
+   * @return modified expression tree
+   */
+  public RexNode convertRefsAndExprs(
+      RexNode rex,
+      List<RelDataTypeField> destFields,
+      int[] adjustments) {
+    return rex.accept(
+        new RefAndExprConverter(
+            rexBuilder,
+            childFields,
+            destFields,
+            adjustments,
+            childPreserveExprs,
+            nProject,
+            rightPreserveExprs,
+            nProject + childPreserveExprs.size() + nRightProject));
+  }
+
+  /**
+   * Creates a new projection based on the original projection, adjusting all
+   * input refs using an adjustment array passed in. If there was no original
+   * projection, create a new one that selects every field from the underlying
+   * rel.
+   *
+   * <p>If the resulting projection would be trivial, return the child.
+   *
+   * @param projChild   child of the new project
+   * @param adjustments array indicating how much each input reference should
+   *                    be adjusted by
+   * @return the created projection
+   */
+  public RelNode createNewProject(RelNode projChild, int[] adjustments) {
+    final List<Pair<RexNode, String>> projects = Lists.newArrayList();
+
+    if (origProj != null) {
+      for (Pair<RexNode, String> p : origProj.getNamedProjects()) {
+        projects.add(
+            Pair.of(
+                convertRefsAndExprs(
+                    p.left,
+                    projChild.getRowType().getFieldList(),
+                    adjustments),
+                p.right));
+      }
+    } else {
+      for (Ord<RelDataTypeField> field : Ord.zip(childFields)) {
+        projects.add(
+            Pair.of(
+                (RexNode) rexBuilder.makeInputRef(
+                    field.e.getType(), field.i), field.e.getName()));
+      }
+    }
+    return RelOptUtil.createProject(
+        projChild,
+        Pair.left(projects),
+        Pair.right(projects),
+        true /* optimize to avoid trivial projections, as per javadoc */,
+        relBuilder);
+  }
+
+  //~ Inner Classes ----------------------------------------------------------
+
+  /**
+   * Visitor which builds a bitmap of the inputs used by an expressions, as
+   * well as locating expressions corresponding to special operators.
+   */
+  private class InputSpecialOpFinder extends RexVisitorImpl<Void> {
+    private final BitSet rexRefs;
+    private final ImmutableBitSet leftFields;
+    private final ImmutableBitSet rightFields;
+    private final ImmutableBitSet strongFields;
+    private final ExprCondition preserveExprCondition;
+    private final List<RexNode> preserveLeft;
+    private final List<RexNode> preserveRight;
+    private final Strong strong;
+
+    public InputSpecialOpFinder(
+        BitSet rexRefs,
+        ImmutableBitSet leftFields,
+        ImmutableBitSet rightFields,
+        final ImmutableBitSet strongFields,
+        ExprCondition preserveExprCondition,
+        List<RexNode> preserveLeft,
+        List<RexNode> preserveRight) {
+      super(true);
+      this.rexRefs = rexRefs;
+      this.leftFields = leftFields;
+      this.rightFields = rightFields;
+      this.preserveExprCondition = preserveExprCondition;
+      this.preserveLeft = preserveLeft;
+      this.preserveRight = preserveRight;
+
+      this.strongFields = strongFields;
+      this.strong = Strong.of(strongFields);
+    }
+
+    public Void visitCall(RexCall call) {
+      if (preserve(call)) {
+        return null;
+      }
+      super.visitCall(call);
+      return null;
+    }
+
+    private boolean isStrong(final ImmutableBitSet exprArgs, final RexNode 
call) {
+      // If the expressions do not use any of the inputs that require output 
to be null,
+      // no need to check.  Otherwise, check that the expression is null.
+      // For example, in an "left outer join", we don't require that 
expressions
+      // pushed down into the left input to be strong.  On the other hand,
+      // expressions pushed into the right input must be.  In that case,
+      // strongFields == right input fields.
+      return !strongFields.intersects(exprArgs) || strong.isNull(call);
+    }
+
+    private boolean preserve(RexNode call) {
+      if (preserveExprCondition.test(call)) {
+        // if the arguments of the expression only reference the
+        // left hand side, preserve it on the left; similarly, if
+        // it only references expressions on the right
+        final ImmutableBitSet exprArgs = RelOptUtil.InputFinder.bits(call);
+        if (exprArgs.cardinality() > 0) {
+          if (leftFields.contains(exprArgs) && isStrong(exprArgs, call)) {
+            addExpr(preserveLeft, call);
+            return true;
+          } else if (rightFields.contains(exprArgs) && isStrong(exprArgs, 
call)) {
+            assert preserveRight != null;
+            addExpr(preserveRight, call);
+            return true;
+          }
+        }
+        // if the expression arguments reference both the left and
+        // right, fall through and don't attempt to preserve the
+        // expression, but instead locate references and special
+        // ops in the call operands
+      }
+      return false;
+    }
+
+    public Void visitInputRef(RexInputRef inputRef) {
+      rexRefs.set(inputRef.getIndex());
+      return null;
+    }
+
+    /**
+     * Adds an expression to a list if the same expression isn't already in
+     * the list. Expressions are identical if their digests are the same.
+     *
+     * @param exprList current list of expressions
+     * @param newExpr  new expression to be added
+     */
+    private void addExpr(List<RexNode> exprList, RexNode newExpr) {
+      String newExprString = newExpr.toString();
+      for (RexNode expr : exprList) {
+        if (newExprString.compareTo(expr.toString()) == 0) {
+          return;
+        }
+      }
+      exprList.add(newExpr);
+    }
+  }
+
+  /**
+   * Walks an expression tree, replacing input refs with new values to reflect
+   * projection and converting special expressions to field references.
+   */
+  private class RefAndExprConverter extends RelOptUtil.RexInputConverter {
+    private final List<RexNode> preserveLeft;
+    private final int firstLeftRef;
+    private final List<RexNode> preserveRight;
+    private final int firstRightRef;
+
+    public RefAndExprConverter(
+        RexBuilder rexBuilder,
+        List<RelDataTypeField> srcFields,
+        List<RelDataTypeField> destFields,
+        int[] adjustments,
+        List<RexNode> preserveLeft,
+        int firstLeftRef,
+        List<RexNode> preserveRight,
+        int firstRightRef) {
+      super(rexBuilder, srcFields, destFields, adjustments);
+      this.preserveLeft = preserveLeft;
+      this.firstLeftRef = firstLeftRef;
+      this.preserveRight = preserveRight;
+      this.firstRightRef = firstRightRef;
+    }
+
+    public RexNode visitCall(RexCall call) {
+      // if the expression corresponds to one that needs to be preserved,
+      // convert it to a field reference; otherwise, convert the entire
+      // expression
+      int match =
+          findExprInLists(
+              call,
+              preserveLeft,
+              firstLeftRef,
+              preserveRight,
+              firstRightRef);
+      if (match >= 0) {
+        return rexBuilder.makeInputRef(
+            destFields.get(match).getType(),
+            match);
+      }
+      return super.visitCall(call);
+    }
+
+    /**
+     * Looks for a matching RexNode from among two lists of RexNodes and
+     * returns the offset into the list corresponding to the match, adjusted
+     * by an amount, depending on whether the match was from the first or
+     * second list.
+     *
+     * @param rex      RexNode that is being matched against
+     * @param rexList1 first list of RexNodes
+     * @param adjust1  adjustment if match occurred in first list
+     * @param rexList2 second list of RexNodes
+     * @param adjust2  adjustment if match occurred in the second list
+     * @return index in the list corresponding to the matching RexNode; -1
+     * if no match
+     */
+    private int findExprInLists(
+        RexNode rex,
+        List<RexNode> rexList1,
+        int adjust1,
+        List<RexNode> rexList2,
+        int adjust2) {
+      int match = findExprInList(rex, rexList1);
+      if (match >= 0) {
+        return match + adjust1;
+      }
+
+      if (rexList2 != null) {
+        match = findExprInList(rex, rexList2);
+        if (match >= 0) {
+          return match + adjust2;
+        }
+      }
+
+      return -1;
+    }
+
+    private int findExprInList(RexNode rex, List<RexNode> rexList) {
+      int match = 0;
+      for (RexNode rexElement : rexList) {
+        if (rexElement.toString().compareTo(rex.toString()) == 0) {
+          return match;
+        }
+        match++;
+      }
+      return -1;
+    }
+  }
+
+  /**
+   * A functor that replies true or false for a given expression.
+   *
+   * @see org.apache.calcite.rel.rules.PushProjector.OperatorExprCondition
+   */
+  public interface ExprCondition extends Predicate<RexNode> {
+    /**
+     * Evaluates a condition for a given expression.
+     *
+     * @param expr Expression
+     * @return result of evaluating the condition
+     */
+    boolean test(RexNode expr);
+
+    /**
+     * Constant condition that replies {@code false} for all expressions.
+     */
+    ExprCondition FALSE =
+        new ExprConditionImpl() {
+          @Override public boolean test(RexNode expr) {
+            return false;
+          }
+        };
+
+    /**
+     * Constant condition that replies {@code true} for all expressions.
+     */
+    ExprCondition TRUE =
+        new ExprConditionImpl() {
+          @Override public boolean test(RexNode expr) {
+            return true;
+          }
+        };
+  }
+
+  /** Implementation of {@link ExprCondition}. */
+  abstract static class ExprConditionImpl extends PredicateImpl<RexNode>
+      implements ExprCondition {
+  }
+
+  /**
+   * An expression condition that evaluates to true if the expression is
+   * a call to one of a set of operators.
+   */
+  class OperatorExprCondition extends ExprConditionImpl {
+    private final Set<SqlOperator> operatorSet;
+
+    /**
+     * Creates an OperatorExprCondition.
+     *
+     * @param operatorSet Set of operators
+     */
+    public OperatorExprCondition(Iterable<? extends SqlOperator> operatorSet) {
+      this.operatorSet = ImmutableSet.copyOf(operatorSet);
+    }
+
+    public boolean test(RexNode expr) {
+      return expr instanceof RexCall
+          && operatorSet.contains(((RexCall) expr).getOperator());
+    }
+  }
+}
+
+// End PushProjector.java

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
deleted file mode 100644
index a57cf10..0000000
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.sql.fun;
-
-/*
- * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT UNTIL 
CALCITE-1761 IS FIXED.
- */
-
-import org.apache.calcite.sql.SqlFunction;
-import org.apache.calcite.sql.SqlFunctionCategory;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlOperatorBinding;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlOperandTypeChecker;
-import org.apache.calcite.sql.validate.SqlMonotonicity;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.List;
-
-/**
- * SQL function that computes keys by which rows can be partitioned and
- * aggregated.
- *
- * <p>Grouped window functions always occur in the GROUP BY clause. They often
- * have auxiliary functions that access information about the group. For
- * example, {@code HOP} is a group function, and its auxiliary functions are
- * {@code HOP_START} and {@code HOP_END}. Here they are used in a streaming
- * query:
- *
- * <blockquote><pre>
- * SELECT STREAM HOP_START(rowtime, INTERVAL '1' HOUR),
- *   HOP_END(rowtime, INTERVAL '1' HOUR),
- *   MIN(unitPrice)
- * FROM Orders
- * GROUP BY HOP(rowtime, INTERVAL '1' HOUR), productId
- * </pre></blockquote>
- */
-class SqlGroupFunction extends SqlFunction {
-       /** The grouped function, if this an auxiliary function; null 
otherwise. */
-       final SqlGroupFunction groupFunction;
-
-       /** Creates a SqlGroupFunction.
-        *
-        * @param kind Kind; also determines function name
-        * @param groupFunction Group function, if this is an auxiliary;
-        *                      null, if this is a group function
-        * @param operandTypeChecker Operand type checker
-        */
-       SqlGroupFunction(SqlKind kind, SqlGroupFunction groupFunction,
-               SqlOperandTypeChecker operandTypeChecker) {
-               super(kind.name(), kind, ReturnTypes.ARG0, null,
-                       operandTypeChecker, SqlFunctionCategory.SYSTEM);
-               this.groupFunction = groupFunction;
-               if (groupFunction != null) {
-                       assert groupFunction.groupFunction == null;
-               }
-       }
-
-       /** Creates an auxiliary function from this grouped window function. */
-       SqlGroupFunction auxiliary(SqlKind kind) {
-               return new SqlGroupFunction(kind, this, 
getOperandTypeChecker());
-       }
-
-       /** Returns a list of this grouped window function's auxiliary 
functions. */
-       List<SqlGroupFunction> getAuxiliaryFunctions() {
-               return ImmutableList.of();
-       }
-
-       @Override public boolean isGroup() {
-               // Auxiliary functions are not group functions
-               return groupFunction == null;
-       }
-
-       @Override public boolean isGroupAuxiliary() {
-               return groupFunction != null;
-       }
-
-       @Override public SqlMonotonicity getMonotonicity(SqlOperatorBinding 
call) {
-               // Monotonic iff its first argument is, but not strict.
-               //
-               // Note: This strategy happens to works for all current group 
functions
-               // (HOP, TUMBLE, SESSION). When there are exceptions to this 
rule, we'll
-               // make the method abstract.
-               return call.getOperandMonotonicity(0).unstrict();
-       }
-}
-
-// End SqlGroupFunction.java

Reply via email to