Repository: beam Updated Branches: refs/heads/DSL_SQL 523482be0 -> 8bb59840b
[BEAM-2195] Implement conditional operator (CASE) Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/12dd8046 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/12dd8046 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/12dd8046 Branch: refs/heads/DSL_SQL Commit: 12dd804678bfaad4705c8f1d50eaf03a086f6daf Parents: 523482b Author: James Xu <[email protected]> Authored: Wed May 10 17:19:48 2017 +0800 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Sat May 13 08:08:23 2017 +0200 ---------------------------------------------------------------------- .../dsls/sql/interpreter/BeamSQLFnExecutor.java | 3 + .../operator/BeamSqlCaseExpression.java | 64 +++++++++++++ .../sql/interpreter/BeamSQLFnExecutorTest.java | 11 +++ .../operator/BeamSqlCaseExpressionTest.java | 94 ++++++++++++++++++++ 4 files changed, 172 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/12dd8046/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java index 4ae7b33..4b7af2a 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression; @@ -159,6 +160,8 @@ public class BeamSQLFnExecutor implements BeamSQLExpressionExecutor { case "INITCAP": return new BeamSqlInitCapExpression(subExps); + case "CASE": + return new BeamSqlCaseExpression(subExps); case "IS NULL": return new BeamSqlIsNullExpression(subExps.get(0)); http://git-wip-us.apache.org/repos/asf/beam/blob/12dd8046/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java new file mode 100644 index 0000000..d108abd --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.List; + +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlCaseExpression} represents CASE, NULLIF, COALESCE in SQL. + */ +public class BeamSqlCaseExpression extends BeamSqlExpression { + public BeamSqlCaseExpression(List<BeamSqlExpression> operands) { + // the return type of CASE is the type of the `else` condition + super(operands, operands.get(operands.size() - 1).getOutputType()); + } + + @Override public boolean accept() { + // `when`-`then` pair + `else` + if (operands.size() % 2 != 1) { + return false; + } + + for (int i = 0; i < operands.size() - 1; i += 2) { + if (opType(i) != SqlTypeName.BOOLEAN) { + return false; + } else if (opType(i + 1) != outputType) { + return false; + } + } + + return true; + } + + @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) { + for (int i = 0; i < operands.size() - 1; i += 2) { + if (opValueEvaluated(i, inputRecord)) { + return BeamSqlPrimitive.of( + outputType, + opValueEvaluated(i + 1, inputRecord) + ); + } + } + return BeamSqlPrimitive.of(outputType, + opValueEvaluated(operands.size() - 1, inputRecord)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/12dd8046/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java index d7379fc..ba9f525 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java @@ -23,6 +23,7 @@ import java.math.BigDecimal; import java.util.Arrays; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression; @@ -253,5 +254,15 @@ public class BeamSQLFnExecutorTest extends BeamSQLFnExecutorTestBase { ); exp = BeamSQLFnExecutor.buildExpression(rexNode); assertTrue(exp instanceof BeamSqlOverlayExpression); + + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CASE, + Arrays.asList( + rexBuilder.makeLiteral(true), + rexBuilder.makeLiteral("HELLO"), + rexBuilder.makeLiteral("HELLO") + ) + ); + exp = BeamSQLFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlCaseExpression); } } http://git-wip-us.apache.org/repos/asf/beam/blob/12dd8046/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpressionTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpressionTest.java new file mode 100644 index 0000000..06b5073 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpressionTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Test; + +/** + * Test for BeamSqlCaseExpression. + */ +public class BeamSqlCaseExpressionTest extends BeamSQLFnExecutorTestBase { + + @Test public void accept() throws Exception { + List<BeamSqlExpression> operands = new ArrayList<>(); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); + assertTrue(new BeamSqlCaseExpression(operands).accept()); + + // even param count + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); + assertFalse(new BeamSqlCaseExpression(operands).accept()); + + // `when` type error + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "error")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); + assertFalse(new BeamSqlCaseExpression(operands).accept()); + + // `then` type mixing + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 10)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + assertFalse(new BeamSqlCaseExpression(operands).accept()); + + } + + @Test public void evaluate() throws Exception { + List<BeamSqlExpression> operands = new ArrayList<>(); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); + assertEquals("hello", new BeamSqlCaseExpression(operands) + .evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); + assertEquals("world", new BeamSqlCaseExpression(operands) + .evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello1")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); + assertEquals("hello1", new BeamSqlCaseExpression(operands) + .evaluate(record).getValue()); + } +}
