http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java new file mode 100644 index 0000000..2ca0a98 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; + +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for BeamSqlOverlayExpression. + */ +public class BeamSqlOverlayExpressionTest extends BeamSqlFnExecutorTestBase { + + @Test public void accept() throws Exception { + List<BeamSqlExpression> operands = new ArrayList<>(); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + assertTrue(new BeamSqlOverlayExpression(operands).accept()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2)); + assertTrue(new BeamSqlOverlayExpression(operands).accept()); + } + + @Test public void evaluate() throws Exception { + List<BeamSqlExpression> operands = new ArrayList<>(); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3)); + Assert.assertEquals("w3resou3rce", + new BeamSqlOverlayExpression(operands).evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4)); + Assert.assertEquals("w3resou33rce", + new BeamSqlOverlayExpression(operands).evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5)); + Assert.assertEquals("w3resou3rce", + new BeamSqlOverlayExpression(operands).evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 7)); + Assert.assertEquals("w3resouce", + new BeamSqlOverlayExpression(operands).evaluate(record).getValue()); + } + +}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java new file mode 100644 index 0000000..a8e3dd2 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Test; + +/** + * Test for BeamSqlPositionExpression. + */ +public class BeamSqlPositionExpressionTest extends BeamSqlFnExecutorTestBase { + + @Test public void accept() throws Exception { + List<BeamSqlExpression> operands = new ArrayList<>(); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello")); + assertTrue(new BeamSqlPositionExpression(operands).accept()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + assertTrue(new BeamSqlPositionExpression(operands).accept()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello")); + assertFalse(new BeamSqlPositionExpression(operands).accept()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + assertFalse(new BeamSqlPositionExpression(operands).accept()); + } + + @Test public void evaluate() throws Exception { + List<BeamSqlExpression> operands = new ArrayList<>(); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello")); + assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + assertEquals(-1, new BeamSqlPositionExpression(operands).evaluate(record).getValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java new file mode 100644 index 0000000..f23a18d --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Test; + +/** + * Test for BeamSqlStringUnaryExpression. + */ +public class BeamSqlStringUnaryExpressionTest { + + @Test public void accept() throws Exception { + List<BeamSqlExpression> operands = new ArrayList<>(); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + assertTrue(new BeamSqlCharLengthExpression(operands).accept()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + assertFalse(new BeamSqlCharLengthExpression(operands).accept()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + assertFalse(new BeamSqlCharLengthExpression(operands).accept()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java new file mode 100644 index 0000000..ea929a4 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Test; + +/** + * Test for BeamSqlSubstringExpression. + */ +public class BeamSqlSubstringExpressionTest extends BeamSqlFnExecutorTestBase { + + @Test public void accept() throws Exception { + List<BeamSqlExpression> operands = new ArrayList<>(); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + assertTrue(new BeamSqlSubstringExpression(operands).accept()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2)); + assertTrue(new BeamSqlSubstringExpression(operands).accept()); + } + + @Test public void evaluate() throws Exception { + List<BeamSqlExpression> operands = new ArrayList<>(); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + assertEquals("hello", + new BeamSqlSubstringExpression(operands).evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2)); + assertEquals("he", + new BeamSqlSubstringExpression(operands).evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5)); + assertEquals("hello", + new BeamSqlSubstringExpression(operands).evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100)); + assertEquals("hello", + new BeamSqlSubstringExpression(operands).evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0)); + assertEquals("", + new BeamSqlSubstringExpression(operands).evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1)); + assertEquals("", + new BeamSqlSubstringExpression(operands).evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1)); + assertEquals("o", + new BeamSqlSubstringExpression(operands).evaluate(record).getValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java new file mode 100644 index 0000000..8b2570e --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.calcite.sql.fun.SqlTrimFunction; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for BeamSqlTrimExpression. + */ +public class BeamSqlTrimExpressionTest extends BeamSqlFnExecutorTestBase { + + @Test public void accept() throws Exception { + List<BeamSqlExpression> operands = new ArrayList<>(); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " hello ")); + assertTrue(new BeamSqlTrimExpression(operands).accept()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.BOTH)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe")); + assertTrue(new BeamSqlTrimExpression(operands).accept()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe")); + assertFalse(new BeamSqlTrimExpression(operands).accept()); + } + + @Test public void evaluate() throws Exception { + List<BeamSqlExpression> operands = new ArrayList<>(); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.LEADING)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe")); + Assert.assertEquals("__hehe", + new BeamSqlTrimExpression(operands).evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.TRAILING)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe")); + Assert.assertEquals("hehe__", + new BeamSqlTrimExpression(operands).evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.BOTH)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he")); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "__")); + Assert.assertEquals("__", + new BeamSqlTrimExpression(operands).evaluate(record).getValue()); + + operands.clear(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " hello ")); + Assert.assertEquals("hello", + new BeamSqlTrimExpression(operands).evaluate(record).getValue()); + } + + @Test public void leadingTrim() throws Exception { + assertEquals("__hehe", + BeamSqlTrimExpression.leadingTrim("hehe__hehe", "he")); + } + + @Test public void trailingTrim() throws Exception { + assertEquals("hehe__", + BeamSqlTrimExpression.trailingTrim("hehe__hehe", "he")); + } + + @Test public void trim() throws Exception { + assertEquals("__", + BeamSqlTrimExpression.leadingTrim( + BeamSqlTrimExpression.trailingTrim("hehe__hehe", "he"), "he" + )); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java new file mode 100644 index 0000000..a225cd6 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Test; + +/** + * Test of BeamSqlUpperExpression. + */ +public class BeamSqlUpperExpressionTest extends BeamSqlFnExecutorTestBase { + + @Test public void evaluate() throws Exception { + List<BeamSqlExpression> operands = new ArrayList<>(); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); + assertEquals("HELLO", + new BeamSqlUpperExpression(operands).evaluate(record).getValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java new file mode 100644 index 0000000..c7c26eb --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.mock; + +import static org.apache.beam.sdk.extensions.sql.TestUtils.buildBeamSqlRowType; +import static org.apache.beam.sdk.extensions.sql.TestUtils.buildRows; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.schema.BeamIOType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +/** + * Mocked table for bounded data sources. + */ +public class MockedBoundedTable extends MockedTable { + /** rows written to this table. */ + private static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>(); + /** rows flow out from this table. */ + private final List<BeamSqlRow> rows = new ArrayList<>(); + + public MockedBoundedTable(BeamSqlRowType beamSqlRowType) { + super(beamSqlRowType); + } + + /** + * Convenient way to build a mocked bounded table. + * + * <p>e.g. + * + * <pre>{@code + * MockedUnboundedTable + * .of(Types.BIGINT, "order_id", + * Types.INTEGER, "site_id", + * Types.DOUBLE, "price", + * Types.TIMESTAMP, "order_time") + * }</pre> + */ + public static MockedBoundedTable of(final Object... args){ + return new MockedBoundedTable(buildBeamSqlRowType(args)); + } + + /** + * Build a mocked bounded table with the specified type. + */ + public static MockedBoundedTable of(final BeamSqlRowType type) { + return new MockedBoundedTable(type); + } + + + /** + * Add rows to the builder. + * + * <p>Sample usage: + * + * <pre>{@code + * addRows( + * 1, 3, "james", -- first row + * 2, 5, "bond" -- second row + * ... + * ) + * }</pre> + */ + public MockedBoundedTable addRows(Object... args) { + List<BeamSqlRow> rows = buildRows(getRowType(), Arrays.asList(args)); + this.rows.addAll(rows); + return this; + } + + @Override + public BeamIOType getSourceType() { + return BeamIOType.BOUNDED; + } + + @Override + public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { + return PBegin.in(pipeline).apply( + "MockedBoundedTable_Reader_" + COUNTER.incrementAndGet(), Create.of(rows)); + } + + @Override public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { + return new OutputStore(); + } + + /** + * Keep output in {@code CONTENT} for validation. + * + */ + public static class OutputStore extends PTransform<PCollection<BeamSqlRow>, PDone> { + + @Override + public PDone expand(PCollection<BeamSqlRow> input) { + input.apply(ParDo.of(new DoFn<BeamSqlRow, Void>() { + @ProcessElement + public void processElement(ProcessContext c) { + CONTENT.add(c.element()); + } + + @Teardown + public void close() { + CONTENT.clear(); + } + + })); + return PDone.in(input.getPipeline()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java new file mode 100644 index 0000000..6017ee7 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedTable.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.mock; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +/** + * Base class for mocked table. + */ +public abstract class MockedTable extends BaseBeamTable { + public static final AtomicInteger COUNTER = new AtomicInteger(); + public MockedTable(BeamSqlRowType beamSqlRowType) { + super(beamSqlRowType); + } + + @Override + public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { + throw new UnsupportedOperationException("buildIOWriter unsupported!"); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java new file mode 100644 index 0000000..f9ea2ac --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.mock; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.schema.BeamIOType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.calcite.util.Pair; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * A mocked unbounded table. + */ +public class MockedUnboundedTable extends MockedTable { + /** rows flow out from this table with the specified watermark instant. */ + private final List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new ArrayList<>(); + /** specify the index of column in the row which stands for the event time field. */ + private int timestampField; + private MockedUnboundedTable(BeamSqlRowType beamSqlRowType) { + super(beamSqlRowType); + } + + /** + * Convenient way to build a mocked unbounded table. + * + * <p>e.g. + * + * <pre>{@code + * MockedUnboundedTable + * .of(Types.BIGINT, "order_id", + * Types.INTEGER, "site_id", + * Types.DOUBLE, "price", + * Types.TIMESTAMP, "order_time") + * }</pre> + */ + public static MockedUnboundedTable of(final Object... args){ + return new MockedUnboundedTable(TestUtils.buildBeamSqlRowType(args)); + } + + public MockedUnboundedTable timestampColumnIndex(int idx) { + this.timestampField = idx; + return this; + } + + /** + * Add rows to the builder. + * + * <p>Sample usage: + * + * <pre>{@code + * addRows( + * duration, -- duration which stands for the corresponding watermark instant + * 1, 3, "james", -- first row + * 2, 5, "bond" -- second row + * ... + * ) + * }</pre> + */ + public MockedUnboundedTable addRows(Duration duration, Object... args) { + List<BeamSqlRow> rows = TestUtils.buildRows(getRowType(), Arrays.asList(args)); + // record the watermark + rows + this.timestampedRows.add(Pair.of(duration, rows)); + return this; + } + + @Override public BeamIOType getSourceType() { + return BeamIOType.UNBOUNDED; + } + + @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { + TestStream.Builder<BeamSqlRow> values = TestStream.create( + new BeamSqlRowCoder(beamSqlRowType)); + + for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) { + values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey())); + for (int i = 0; i < pair.getValue().size(); i++) { + values = values.addElements(TimestampedValue.of(pair.getValue().get(i), + new Instant(pair.getValue().get(i).getDate(timestampField)))); + } + } + + return pipeline.begin().apply( + "MockedUnboundedTable_" + COUNTER.incrementAndGet(), + values.advanceWatermarkToInfinity()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRelTest.java new file mode 100644 index 0000000..7b8d9a4 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRelTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rel; + +import java.sql.Types; +import org.apache.beam.sdk.extensions.sql.BeamSqlCli; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamIntersectRel}. + */ +public class BeamIntersectRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + + @BeforeClass + public static void prepare() { + sqlEnv.registerTable("ORDER_DETAILS1", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 1L, 1, 1.0, + 2L, 2, 2.0, + 4L, 4, 4.0 + ) + ); + + sqlEnv.registerTable("ORDER_DETAILS2", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 2L, 2, 2.0, + 3L, 3, 3.0 + ) + ); + } + + @Test + public void testIntersect() throws Exception { + String sql = ""; + sql += "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS1 " + + " INTERSECT " + + "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS2 "; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 2L, 2, 2.0 + ).getRows()); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testIntersectAll() throws Exception { + String sql = ""; + sql += "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS1 " + + " INTERSECT ALL " + + "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS2 "; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).satisfies(new CheckSize(3)); + + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 1L, 1, 1.0, + 2L, 2, 2.0 + ).getRows()); + + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelBoundedVsBoundedTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelBoundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelBoundedVsBoundedTest.java new file mode 100644 index 0000000..2acee82 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelBoundedVsBoundedTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rel; + +import java.sql.Types; +import org.apache.beam.sdk.extensions.sql.BeamSqlCli; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Bounded + Bounded Test for {@code BeamJoinRel}. + */ +public class BeamJoinRelBoundedVsBoundedTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); + + public static final MockedBoundedTable ORDER_DETAILS1 = + MockedBoundedTable.of( + Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price" + ).addRows( + 1, 2, 3, + 2, 3, 3, + 3, 4, 5 + ); + + public static final MockedBoundedTable ORDER_DETAILS2 = + MockedBoundedTable.of( + Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price" + ).addRows( + 1, 2, 3, + 2, 3, 3, + 3, 4, 5 + ); + + @BeforeClass + public static void prepare() { + beamSqlEnv.registerTable("ORDER_DETAILS1", ORDER_DETAILS1); + beamSqlEnv.registerTable("ORDER_DETAILS2", ORDER_DETAILS2); + } + + @Test + public void testInnerJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price", + Types.INTEGER, "order_id0", + Types.INTEGER, "site_id0", + Types.INTEGER, "price0" + ).addRows( + 2, 3, 3, 1, 2, 3 + ).getRows()); + pipeline.run(); + } + + @Test + public void testLeftOuterJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " LEFT OUTER JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.enableAbandonedNodeEnforcement(false); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price", + Types.INTEGER, "order_id0", + Types.INTEGER, "site_id0", + Types.INTEGER, "price0" + ).addRows( + 1, 2, 3, null, null, null, + 2, 3, 3, 1, 2, 3, + 3, 4, 5, null, null, null + ).getRows()); + pipeline.run(); + } + + @Test + public void testRightOuterJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " RIGHT OUTER JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price", + Types.INTEGER, "order_id0", + Types.INTEGER, "site_id0", + Types.INTEGER, "price0" + ).addRows( + 2, 3, 3, 1, 2, 3, + null, null, null, 2, 3, 3, + null, null, null, 3, 4, 5 + ).getRows()); + pipeline.run(); + } + + @Test + public void testFullOuterJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " FULL OUTER JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price", + Types.INTEGER, "order_id0", + Types.INTEGER, "site_id0", + Types.INTEGER, "price0" + ).addRows( + 2, 3, 3, 1, 2, 3, + 1, 2, 3, null, null, null, + 3, 4, 5, null, null, null, + null, null, null, 2, 3, 3, + null, null, null, 3, 4, 5 + ).getRows()); + pipeline.run(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testException_nonEqualJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id>o2.site_id" + ; + + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testException_crossJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2"; + + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java new file mode 100644 index 0000000..e226b70 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rel; + +import java.sql.Types; +import java.util.Date; +import org.apache.beam.sdk.extensions.sql.BeamSqlCli; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; +import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.transform.BeamSqlOutputToConsoleFn; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Unbounded + Unbounded Test for {@code BeamJoinRel}. + */ +public class BeamJoinRelUnboundedVsBoundedTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); + public static final Date FIRST_DATE = new Date(1); + public static final Date SECOND_DATE = new Date(1 + 3600 * 1000); + public static final Date THIRD_DATE = new Date(1 + 3600 * 1000 + 3600 * 1000 + 1); + private static final Duration WINDOW_SIZE = Duration.standardHours(1); + + @BeforeClass + public static void prepare() { + beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable + .of( + Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price", + Types.TIMESTAMP, "order_time" + ) + .timestampColumnIndex(3) + .addRows( + Duration.ZERO, + 1, 1, 1, FIRST_DATE, + 1, 2, 2, FIRST_DATE + ) + .addRows( + WINDOW_SIZE.plus(Duration.standardSeconds(1)), + 2, 2, 3, SECOND_DATE, + 2, 3, 3, SECOND_DATE, + // this late data is omitted + 1, 2, 3, FIRST_DATE + ) + .addRows( + WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardSeconds(1)), + 3, 3, 3, THIRD_DATE, + // this late data is omitted + 2, 2, 3, SECOND_DATE + ) + ); + + beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBoundedTable + .of(Types.INTEGER, "order_id", + Types.VARCHAR, "buyer" + ).addRows( + 1, "james", + 2, "bond" + )); + } + + @Test + public void testInnerJoin_unboundedTableOnTheLeftSide() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " JOIN " + + " ORDER_DETAILS1 o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.VARCHAR, "buyer" + ).addRows( + 1, 3, "james", + 2, 5, "bond" + ).getStringRows() + ); + pipeline.run(); + } + + @Test + public void testInnerJoin_boundedTableOnTheLeftSide() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + " ORDER_DETAILS1 o2 " + + " JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.VARCHAR, "buyer" + ).addRows( + 1, 3, "james", + 2, 5, "bond" + ).getStringRows() + ); + pipeline.run(); + } + + @Test + public void testLeftOuterJoin() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " LEFT OUTER JOIN " + + " ORDER_DETAILS1 o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld"))); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.VARCHAR, "buyer" + ).addRows( + 1, 3, "james", + 2, 5, "bond", + 3, 3, null + ).getStringRows() + ); + pipeline.run(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testLeftOuterJoinError() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + " ORDER_DETAILS1 o2 " + + " LEFT OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " on " + + " o1.order_id=o2.order_id" + ; + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } + + @Test + public void testRightOuterJoin() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + " ORDER_DETAILS1 o2 " + + " RIGHT OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " on " + + " o1.order_id=o2.order_id" + ; + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.VARCHAR, "buyer" + ).addRows( + 1, 3, "james", + 2, 5, "bond", + 3, 3, null + ).getStringRows() + ); + pipeline.run(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testRightOuterJoinError() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " RIGHT OUTER JOIN " + + " ORDER_DETAILS1 o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testFullOuterJoinError() throws Exception { + String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " + + " ORDER_DETAILS1 o2 " + + " FULL OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " on " + + " o1.order_id=o2.order_id" + ; + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java new file mode 100644 index 0000000..c366a6e --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rel; + +import java.sql.Types; +import java.util.Date; +import org.apache.beam.sdk.extensions.sql.BeamSqlCli; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.transform.BeamSqlOutputToConsoleFn; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Unbounded + Unbounded Test for {@code BeamJoinRel}. + */ +public class BeamJoinRelUnboundedVsUnboundedTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); + public static final Date FIRST_DATE = new Date(1); + public static final Date SECOND_DATE = new Date(1 + 3600 * 1000); + + private static final Duration WINDOW_SIZE = Duration.standardHours(1); + + @BeforeClass + public static void prepare() { + beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable + .of(Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price", + Types.TIMESTAMP, "order_time" + ) + .timestampColumnIndex(3) + .addRows( + Duration.ZERO, + 1, 1, 1, FIRST_DATE, + 1, 2, 6, FIRST_DATE + ) + .addRows( + WINDOW_SIZE.plus(Duration.standardMinutes(1)), + 2, 2, 7, SECOND_DATE, + 2, 3, 8, SECOND_DATE, + // this late record is omitted(First window) + 1, 3, 3, FIRST_DATE + ) + .addRows( + // this late record is omitted(Second window) + WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1)), + 2, 3, 3, SECOND_DATE + ) + ); + } + + @Test + public void testInnerJoin() throws Exception { + String sql = "SELECT * FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.INTEGER, "order_id0", + Types.INTEGER, "sum_site_id0").addRows( + 1, 3, 1, 3, + 2, 5, 2, 5 + ).getStringRows() + ); + pipeline.run(); + } + + @Test + public void testLeftOuterJoin() throws Exception { + String sql = "SELECT * FROM " + + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " LEFT OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + // 1, 1 | 1, 3 + // 2, 2 | NULL, NULL + // ---- | ----- + // 2, 2 | 2, 5 + // 3, 3 | NULL, NULL + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.INTEGER, "order_id0", + Types.INTEGER, "sum_site_id0" + ).addRows( + 1, 1, 1, 3, + 2, 2, null, null, + 2, 2, 2, 5, + 3, 3, null, null + ).getStringRows() + ); + pipeline.run(); + } + + @Test + public void testRightOuterJoin() throws Exception { + String sql = "SELECT * FROM " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " RIGHT OUTER JOIN " + + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id", + Types.INTEGER, "order_id0", + Types.INTEGER, "sum_site_id0" + ).addRows( + 1, 3, 1, 1, + null, null, 2, 2, + 2, 5, 2, 2, + null, null, 3, 3 + ).getStringRows() + ); + pipeline.run(); + } + + @Test + public void testFullOuterJoin() throws Exception { + String sql = "SELECT * FROM " + + "(select price as order_id1, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY price, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " FULL OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id , TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id1=o2.order_id" + ; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello"))); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id1", + Types.INTEGER, "sum_site_id", + Types.INTEGER, "order_id", + Types.INTEGER, "sum_site_id0" + ).addRows( + 1, 1, 1, 3, + 6, 2, null, null, + 7, 2, null, null, + 8, 3, null, null, + null, null, 2, 5 + ).getStringRows() + ); + pipeline.run(); + } + + @Test(expected = IllegalArgumentException.class) + public void testWindowsMismatch() throws Exception { + String sql = "SELECT * FROM " + + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '2' HOUR)) o1 " + + " LEFT OUTER JOIN " + + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " + + " on " + + " o1.order_id=o2.order_id" + ; + pipeline.enableAbandonedNodeEnforcement(false); + BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRelTest.java new file mode 100644 index 0000000..f2ed132 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRelTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rel; + +import java.sql.Types; +import org.apache.beam.sdk.extensions.sql.BeamSqlCli; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamMinusRel}. + */ +public class BeamMinusRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + + @BeforeClass + public static void prepare() { + sqlEnv.registerTable("ORDER_DETAILS1", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 1L, 1, 1.0, + 2L, 2, 2.0, + 4L, 4, 4.0, + 4L, 4, 4.0 + ) + ); + + sqlEnv.registerTable("ORDER_DETAILS2", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 2L, 2, 2.0, + 3L, 3, 3.0 + ) + ); + } + + @Test + public void testExcept() throws Exception { + String sql = ""; + sql += "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS1 " + + " EXCEPT " + + "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS2 "; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 4L, 4, 4.0 + ).getRows()); + + pipeline.run(); + } + + @Test + public void testExceptAll() throws Exception { + String sql = ""; + sql += "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS1 " + + " EXCEPT ALL " + + "SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS2 "; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).satisfies(new CheckSize(2)); + + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 4L, 4, 4.0, + 4L, 4, 4.0 + ).getRows()); + + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBaseTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBaseTest.java new file mode 100644 index 0000000..65dd8af2 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBaseTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rel; + +import java.sql.Types; +import java.util.Date; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.BeamSqlCli; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamSetOperatorRelBase}. + */ +public class BeamSetOperatorRelBaseTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + public static final Date THE_DATE = new Date(100000); + + @BeforeClass + public static void prepare() { + sqlEnv.registerTable("ORDER_DETAILS", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price", + Types.TIMESTAMP, "order_time" + ).addRows( + 1L, 1, 1.0, THE_DATE, + 2L, 2, 2.0, THE_DATE + ) + ); + } + + @Test + public void testSameWindow() throws Exception { + String sql = "SELECT " + + " order_id, site_id, count(*) as cnt " + + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + + ", TUMBLE(order_time, INTERVAL '1' HOUR) " + + " UNION SELECT " + + " order_id, site_id, count(*) as cnt " + + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + + ", TUMBLE(order_time, INTERVAL '1' HOUR) "; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + // compare valueInString to ignore the windowStart & windowEnd + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.BIGINT, "cnt" + ).addRows( + 1L, 1, 1L, + 2L, 2, 1L + ).getStringRows()); + pipeline.run(); + } + + @Test(expected = IllegalArgumentException.class) + public void testDifferentWindows() throws Exception { + String sql = "SELECT " + + " order_id, site_id, count(*) as cnt " + + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + + ", TUMBLE(order_time, INTERVAL '1' HOUR) " + + " UNION SELECT " + + " order_id, site_id, count(*) as cnt " + + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + + ", TUMBLE(order_time, INTERVAL '2' HOUR) "; + + // use a real pipeline rather than the TestPipeline because we are + // testing exceptions, the pipeline will not actually run. + Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create()); + BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv); + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRelTest.java new file mode 100644 index 0000000..9e38bb6 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRelTest.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rel; + +import java.sql.Types; +import java.util.Date; +import org.apache.beam.sdk.extensions.sql.BeamSqlCli; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamSortRel}. + */ +public class BeamSortRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + + @Before + public void prepare() { + sqlEnv.registerTable("ORDER_DETAILS", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price", + Types.TIMESTAMP, "order_time" + ).addRows( + 1L, 2, 1.0, new Date(), + 1L, 1, 2.0, new Date(), + 2L, 4, 3.0, new Date(), + 2L, 1, 4.0, new Date(), + 5L, 5, 5.0, new Date(), + 6L, 6, 6.0, new Date(), + 7L, 7, 7.0, new Date(), + 8L, 8888, 8.0, new Date(), + 8L, 999, 9.0, new Date(), + 10L, 100, 10.0, new Date() + ) + ); + sqlEnv.registerTable("SUB_ORDER_RAM", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ) + ); + } + + @Test + public void testOrderBy_basic() throws Exception { + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + "ORDER BY order_id asc, site_id desc limit 4"; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 2, 1.0, + 1L, 1, 2.0, + 2L, 4, 3.0, + 2L, 1, 4.0 + ).getRows()); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testOrderBy_nullsFirst() throws Exception { + sqlEnv.registerTable("ORDER_DETAILS", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 2, 1.0, + 1L, null, 2.0, + 2L, 1, 3.0, + 2L, null, 4.0, + 5L, 5, 5.0 + ) + ); + sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable + .of(Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price")); + + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4"; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, null, 2.0, + 1L, 2, 1.0, + 2L, null, 4.0, + 2L, 1, 3.0 + ).getRows() + ); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testOrderBy_nullsLast() throws Exception { + sqlEnv.registerTable("ORDER_DETAILS", MockedBoundedTable + .of(Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 2, 1.0, + 1L, null, 2.0, + 2L, 1, 3.0, + 2L, null, 4.0, + 5L, 5, 5.0)); + sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable + .of(Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price")); + + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4"; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 2, 1.0, + 1L, null, 2.0, + 2L, 1, 3.0, + 2L, null, 4.0 + ).getRows() + ); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testOrderBy_with_offset() throws Exception { + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + "ORDER BY order_id asc, site_id desc limit 4 offset 4"; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 5L, 5, 5.0, + 6L, 6, 6.0, + 7L, 7, 7.0, + 8L, 8888, 8.0 + ).getRows() + ); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testOrderBy_bigFetch() throws Exception { + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + "ORDER BY order_id asc, site_id desc limit 11"; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 2, 1.0, + 1L, 1, 2.0, + 2L, 4, 3.0, + 2L, 1, 4.0, + 5L, 5, 5.0, + 6L, 6, 6.0, + 7L, 7, 7.0, + 8L, 8888, 8.0, + 8L, 999, 9.0, + 10L, 100, 10.0 + ).getRows() + ); + pipeline.run().waitUntilFinish(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testOrderBy_exception() throws Exception { + String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id) SELECT " + + " order_id, COUNT(*) " + + "FROM ORDER_DETAILS " + + "GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)" + + "ORDER BY order_id asc limit 11"; + + TestPipeline pipeline = TestPipeline.create(); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRelTest.java new file mode 100644 index 0000000..54524df --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRelTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rel; + +import java.sql.Types; +import org.apache.beam.sdk.extensions.sql.BeamSqlCli; +import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test for {@code BeamUnionRel}. + */ +public class BeamUnionRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + + @BeforeClass + public static void prepare() { + sqlEnv.registerTable("ORDER_DETAILS", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 2L, 2, 2.0 + ) + ); + } + + @Test + public void testUnion() throws Exception { + String sql = "SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS " + + " UNION SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS "; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 2L, 2, 2.0 + ).getRows() + ); + pipeline.run(); + } + + @Test + public void testUnionAll() throws Exception { + String sql = "SELECT " + + " order_id, site_id, price " + + "FROM ORDER_DETAILS" + + " UNION ALL " + + " SELECT order_id, site_id, price " + + "FROM ORDER_DETAILS"; + + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 1L, 1, 1.0, + 2L, 2, 2.0, + 2L, 2, 2.0 + ).getRows() + ); + pipeline.run(); + } +}