Repository: beam Updated Branches: refs/heads/DSL_SQL 53d27e6c4 -> 125cef144
CAST operator supporting numeric, date and timestamp types Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/39eedd56 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/39eedd56 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/39eedd56 Branch: refs/heads/DSL_SQL Commit: 39eedd566c3a1825ec3e9be0aa4490cfd824cb30 Parents: 53d27e6 Author: tarushapptech <[email protected]> Authored: Sat Jun 17 13:20:02 2017 +0530 Committer: Tyler Akidau <[email protected]> Committed: Wed Jul 12 10:34:10 2017 -0700 ---------------------------------------------------------------------- .../dsls/sql/interpreter/BeamSqlFnExecutor.java | 4 + .../operator/BeamSqlCastExpression.java | 132 +++++++++++++++++++ .../operator/BeamSqlCastExpressionTest.java | 126 ++++++++++++++++++ 3 files changed, 262 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/39eedd56/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java index 4678da5..2c2efe9 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Calendar; import java.util.List; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCastExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression; @@ -333,6 +334,9 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor { case "CASE": ret = new BeamSqlCaseExpression(subExps); break; + case "CAST": + ret = new BeamSqlCastExpression(subExps, node.type.getSqlTypeName()); + break; case "IS NULL": ret = new BeamSqlIsNullExpression(subExps.get(0)); http://git-wip-us.apache.org/repos/asf/beam/blob/39eedd56/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java new file mode 100644 index 0000000..7e8ab03 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.sql.Date; +import java.sql.Timestamp; +import java.util.List; + +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.runtime.SqlFunctions; +import org.apache.calcite.sql.type.SqlTypeName; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.DateTimeFormatterBuilder; +import org.joda.time.format.DateTimeParser; + +/** + * Base class to support 'CAST' operations for all {@link SqlTypeName}. + */ +public class BeamSqlCastExpression extends BeamSqlExpression { + + private static final int index = 0; + private static final String outputTimestampFormat = "yyyy-MM-dd HH:mm:ss"; + private static final String outputDateFormat = "yyyy-MM-dd"; + /** + * Date and Timestamp formats used to parse + * {@link SqlTypeName#DATE}, {@link SqlTypeName#TIMESTAMP}. + */ + private static final DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder() + .append(null/*printer*/, new DateTimeParser[] { + // date formats + DateTimeFormat.forPattern("yy-MM-dd").getParser(), + DateTimeFormat.forPattern("yy/MM/dd").getParser(), + DateTimeFormat.forPattern("yy.MM.dd").getParser(), + DateTimeFormat.forPattern("yyMMdd").getParser(), + DateTimeFormat.forPattern("yyyyMMdd").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd").getParser(), + DateTimeFormat.forPattern("yyyy/MM/dd").getParser(), + DateTimeFormat.forPattern("yyyy.MM.dd").getParser(), + // datetime formats + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssz").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss z").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSSz").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS z").getParser() }).toFormatter() + .withPivotYear(2020); + + public BeamSqlCastExpression(List<BeamSqlExpression> operands, SqlTypeName castType) { + super(operands, castType); + } + + @Override + public boolean accept() { + return numberOfOperands() == 1; + } + + @Override + public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { + SqlTypeName castOutputType = getOutputType(); + switch (castOutputType) { + case INTEGER: + return BeamSqlPrimitive + .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRecord))); + case DOUBLE: + return BeamSqlPrimitive + .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRecord))); + case SMALLINT: + return BeamSqlPrimitive + .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRecord))); + case TINYINT: + return BeamSqlPrimitive + .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRecord))); + case BIGINT: + return BeamSqlPrimitive + .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRecord))); + case DECIMAL: + return BeamSqlPrimitive.of(SqlTypeName.DECIMAL, + SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRecord))); + case FLOAT: + return BeamSqlPrimitive + .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRecord))); + case CHAR: + case VARCHAR: + return BeamSqlPrimitive + .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRecord).toString()); + case DATE: + return BeamSqlPrimitive + .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRecord), outputDateFormat)); + case TIMESTAMP: + return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, + toTimeStamp(opValueEvaluated(index, inputRecord), outputTimestampFormat)); + } + throw new UnsupportedOperationException( + String.format("Cast to type %s not supported", castOutputType)); + } + + private Date toDate(Object inputDate, String outputFormat) { + try { + return Date + .valueOf(dateTimeFormatter.parseLocalDate(inputDate.toString()).toString(outputFormat)); + } catch (IllegalArgumentException | UnsupportedOperationException e) { + throw new UnsupportedOperationException("Can't be cast to type 'Date'"); + } + } + + private Timestamp toTimeStamp(Object inputTimestamp, String outputFormat) { + try { + return Timestamp.valueOf( + dateTimeFormatter.parseDateTime(inputTimestamp.toString()).secondOfMinute() + .roundCeilingCopy().toString(outputFormat)); + } catch (IllegalArgumentException | UnsupportedOperationException e) { + throw new UnsupportedOperationException("Can't be cast to type 'Timestamp'"); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/39eedd56/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpressionTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpressionTest.java new file mode 100644 index 0000000..c2fd68d --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpressionTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; + +import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test for {@link BeamSqlCastExpression}. + */ +public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase { + + private List<BeamSqlExpression> operands; + + @Before + public void setup() { + operands = new ArrayList<>(); + } + + @Test + public void testForOperands() { + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "aaa")); + Assert.assertFalse(new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).accept()); + } + + @Test + public void testForIntegerToBigintTypeCasting() { + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5)); + Assert.assertEquals(5L, + new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong()); + } + + @Test + public void testForDoubleToBigIntCasting() { + operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 5.45)); + Assert.assertEquals(5L, + new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong()); + } + + @Test + public void testForIntegerToDateCast() { + // test for yyyyMMdd format + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 20170521)); + Assert.assertEquals(Date.valueOf("2017-05-21"), + new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue()); + } + + @Test + public void testyyyyMMddDateFormat() { + //test for yyyy-MM-dd format + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21")); + Assert.assertEquals(Date.valueOf("2017-05-21"), + new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue()); + } + + @Test + public void testyyMMddDateFormat() { + // test for yy.MM.dd format + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17.05.21")); + Assert.assertEquals(Date.valueOf("2017-05-21"), + new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue()); + } + + @Test + public void testForTimestampCastExpression() { + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17-05-21 23:59:59.989")); + Assert.assertEquals(SqlTypeName.TIMESTAMP, + new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record) + .getOutputType()); + } + + @Test + public void testDateTimeFormatWithMillis() { + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.989")); + Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"), + new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue()); + } + + @Test + public void testDateTimeFormatWithTimezone() { + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.89079 PST")); + Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"), + new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue()); + } + + @Test + public void testDateTimeFormat() { + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59")); + Assert.assertEquals(Timestamp.valueOf("2017-05-21 23:59:59"), + new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue()); + } + + @Test(expected = RuntimeException.class) + public void testForCastTypeNotSupported() { + operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, Calendar.getInstance().getTime())); + Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"), + new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue()); + } + +}
