http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java deleted file mode 100644 index 2ca0a98..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; - -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Assert; -import org.junit.Test; - -/** - * Test for BeamSqlOverlayExpression. - */ -public class BeamSqlOverlayExpressionTest extends BeamSqlFnExecutorTestBase { - - @Test public void accept() throws Exception { - List<BeamSqlExpression> operands = new ArrayList<>(); - - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - assertTrue(new BeamSqlOverlayExpression(operands).accept()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2)); - assertTrue(new BeamSqlOverlayExpression(operands).accept()); - } - - @Test public void evaluate() throws Exception { - List<BeamSqlExpression> operands = new ArrayList<>(); - - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3)); - Assert.assertEquals("w3resou3rce", - new BeamSqlOverlayExpression(operands).evaluate(record).getValue()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4)); - Assert.assertEquals("w3resou33rce", - new BeamSqlOverlayExpression(operands).evaluate(record).getValue()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5)); - Assert.assertEquals("w3resou3rce", - new BeamSqlOverlayExpression(operands).evaluate(record).getValue()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 7)); - Assert.assertEquals("w3resouce", - new BeamSqlOverlayExpression(operands).evaluate(record).getValue()); - } - -}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java deleted file mode 100644 index a8e3dd2..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Test; - -/** - * Test for BeamSqlPositionExpression. - */ -public class BeamSqlPositionExpressionTest extends BeamSqlFnExecutorTestBase { - - @Test public void accept() throws Exception { - List<BeamSqlExpression> operands = new ArrayList<>(); - - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello")); - assertTrue(new BeamSqlPositionExpression(operands).accept()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - assertTrue(new BeamSqlPositionExpression(operands).accept()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello")); - assertFalse(new BeamSqlPositionExpression(operands).accept()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - assertFalse(new BeamSqlPositionExpression(operands).accept()); - } - - @Test public void evaluate() throws Exception { - List<BeamSqlExpression> operands = new ArrayList<>(); - - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello")); - assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(record).getValue()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(record).getValue()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - assertEquals(-1, new BeamSqlPositionExpression(operands).evaluate(record).getValue()); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java deleted file mode 100644 index f23a18d..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Test; - -/** - * Test for BeamSqlStringUnaryExpression. - */ -public class BeamSqlStringUnaryExpressionTest { - - @Test public void accept() throws Exception { - List<BeamSqlExpression> operands = new ArrayList<>(); - - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - assertTrue(new BeamSqlCharLengthExpression(operands).accept()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - assertFalse(new BeamSqlCharLengthExpression(operands).accept()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - assertFalse(new BeamSqlCharLengthExpression(operands).accept()); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java deleted file mode 100644 index ea929a4..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Test; - -/** - * Test for BeamSqlSubstringExpression. - */ -public class BeamSqlSubstringExpressionTest extends BeamSqlFnExecutorTestBase { - - @Test public void accept() throws Exception { - List<BeamSqlExpression> operands = new ArrayList<>(); - - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - assertTrue(new BeamSqlSubstringExpression(operands).accept()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2)); - assertTrue(new BeamSqlSubstringExpression(operands).accept()); - } - - @Test public void evaluate() throws Exception { - List<BeamSqlExpression> operands = new ArrayList<>(); - - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - assertEquals("hello", - new BeamSqlSubstringExpression(operands).evaluate(record).getValue()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2)); - assertEquals("he", - new BeamSqlSubstringExpression(operands).evaluate(record).getValue()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5)); - assertEquals("hello", - new BeamSqlSubstringExpression(operands).evaluate(record).getValue()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100)); - assertEquals("hello", - new BeamSqlSubstringExpression(operands).evaluate(record).getValue()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0)); - assertEquals("", - new BeamSqlSubstringExpression(operands).evaluate(record).getValue()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1)); - assertEquals("", - new BeamSqlSubstringExpression(operands).evaluate(record).getValue()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1)); - assertEquals("o", - new BeamSqlSubstringExpression(operands).evaluate(record).getValue()); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java deleted file mode 100644 index 8b2570e..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.calcite.sql.fun.SqlTrimFunction; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Assert; -import org.junit.Test; - -/** - * Test for BeamSqlTrimExpression. - */ -public class BeamSqlTrimExpressionTest extends BeamSqlFnExecutorTestBase { - - @Test public void accept() throws Exception { - List<BeamSqlExpression> operands = new ArrayList<>(); - - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " hello ")); - assertTrue(new BeamSqlTrimExpression(operands).accept()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.BOTH)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe")); - assertTrue(new BeamSqlTrimExpression(operands).accept()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe")); - assertFalse(new BeamSqlTrimExpression(operands).accept()); - } - - @Test public void evaluate() throws Exception { - List<BeamSqlExpression> operands = new ArrayList<>(); - - operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.LEADING)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe")); - Assert.assertEquals("__hehe", - new BeamSqlTrimExpression(operands).evaluate(record).getValue()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.TRAILING)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe")); - Assert.assertEquals("hehe__", - new BeamSqlTrimExpression(operands).evaluate(record).getValue()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.BOTH)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he")); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "__")); - Assert.assertEquals("__", - new BeamSqlTrimExpression(operands).evaluate(record).getValue()); - - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " hello ")); - Assert.assertEquals("hello", - new BeamSqlTrimExpression(operands).evaluate(record).getValue()); - } - - @Test public void leadingTrim() throws Exception { - assertEquals("__hehe", - BeamSqlTrimExpression.leadingTrim("hehe__hehe", "he")); - } - - @Test public void trailingTrim() throws Exception { - assertEquals("hehe__", - BeamSqlTrimExpression.trailingTrim("hehe__hehe", "he")); - } - - @Test public void trim() throws Exception { - assertEquals("__", - BeamSqlTrimExpression.leadingTrim( - BeamSqlTrimExpression.trailingTrim("hehe__hehe", "he"), "he" - )); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java deleted file mode 100644 index a225cd6..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.interpreter.operator.string; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutorTestBase; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Test; - -/** - * Test of BeamSqlUpperExpression. - */ -public class BeamSqlUpperExpressionTest extends BeamSqlFnExecutorTestBase { - - @Test public void evaluate() throws Exception { - List<BeamSqlExpression> operands = new ArrayList<>(); - - operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello")); - assertEquals("HELLO", - new BeamSqlUpperExpression(operands).evaluate(record).getValue()); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRelTest.java deleted file mode 100644 index 7b8d9a4..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRelTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rel; - -import java.sql.Types; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.TestUtils; -import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Test for {@code BeamIntersectRel}. - */ -public class BeamIntersectRelTest { - static BeamSqlEnv sqlEnv = new BeamSqlEnv(); - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - @BeforeClass - public static void prepare() { - sqlEnv.registerTable("ORDER_DETAILS1", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 1L, 1, 1.0, - 2L, 2, 2.0, - 4L, 4, 4.0 - ) - ); - - sqlEnv.registerTable("ORDER_DETAILS2", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 2L, 2, 2.0, - 3L, 3, 3.0 - ) - ); - } - - @Test - public void testIntersect() throws Exception { - String sql = ""; - sql += "SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS1 " - + " INTERSECT " - + "SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS2 "; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 2L, 2, 2.0 - ).getRows()); - - pipeline.run().waitUntilFinish(); - } - - @Test - public void testIntersectAll() throws Exception { - String sql = ""; - sql += "SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS1 " - + " INTERSECT ALL " - + "SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS2 "; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).satisfies(new CheckSize(3)); - - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 1L, 1, 1.0, - 2L, 2, 2.0 - ).getRows()); - - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelBoundedVsBoundedTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelBoundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelBoundedVsBoundedTest.java deleted file mode 100644 index 2acee82..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelBoundedVsBoundedTest.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rel; - -import java.sql.Types; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.TestUtils; -import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Bounded + Bounded Test for {@code BeamJoinRel}. - */ -public class BeamJoinRelBoundedVsBoundedTest { - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); - - public static final MockedBoundedTable ORDER_DETAILS1 = - MockedBoundedTable.of( - Types.INTEGER, "order_id", - Types.INTEGER, "site_id", - Types.INTEGER, "price" - ).addRows( - 1, 2, 3, - 2, 3, 3, - 3, 4, 5 - ); - - public static final MockedBoundedTable ORDER_DETAILS2 = - MockedBoundedTable.of( - Types.INTEGER, "order_id", - Types.INTEGER, "site_id", - Types.INTEGER, "price" - ).addRows( - 1, 2, 3, - 2, 3, 3, - 3, 4, 5 - ); - - @BeforeClass - public static void prepare() { - beamSqlEnv.registerTable("ORDER_DETAILS1", ORDER_DETAILS1); - beamSqlEnv.registerTable("ORDER_DETAILS2", ORDER_DETAILS2); - } - - @Test - public void testInnerJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id=o2.site_id AND o2.price=o1.site_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "site_id", - Types.INTEGER, "price", - Types.INTEGER, "order_id0", - Types.INTEGER, "site_id0", - Types.INTEGER, "price0" - ).addRows( - 2, 3, 3, 1, 2, 3 - ).getRows()); - pipeline.run(); - } - - @Test - public void testLeftOuterJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " LEFT OUTER JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id=o2.site_id AND o2.price=o1.site_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - pipeline.enableAbandonedNodeEnforcement(false); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "site_id", - Types.INTEGER, "price", - Types.INTEGER, "order_id0", - Types.INTEGER, "site_id0", - Types.INTEGER, "price0" - ).addRows( - 1, 2, 3, null, null, null, - 2, 3, 3, 1, 2, 3, - 3, 4, 5, null, null, null - ).getRows()); - pipeline.run(); - } - - @Test - public void testRightOuterJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " RIGHT OUTER JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id=o2.site_id AND o2.price=o1.site_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "site_id", - Types.INTEGER, "price", - Types.INTEGER, "order_id0", - Types.INTEGER, "site_id0", - Types.INTEGER, "price0" - ).addRows( - 2, 3, 3, 1, 2, 3, - null, null, null, 2, 3, 3, - null, null, null, 3, 4, 5 - ).getRows()); - pipeline.run(); - } - - @Test - public void testFullOuterJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " FULL OUTER JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id=o2.site_id AND o2.price=o1.site_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "site_id", - Types.INTEGER, "price", - Types.INTEGER, "order_id0", - Types.INTEGER, "site_id0", - Types.INTEGER, "price0" - ).addRows( - 2, 3, 3, 1, 2, 3, - 1, 2, 3, null, null, null, - 3, 4, 5, null, null, null, - null, null, null, 2, 3, 3, - null, null, null, 3, 4, 5 - ).getRows()); - pipeline.run(); - } - - @Test(expected = UnsupportedOperationException.class) - public void testException_nonEqualJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id>o2.site_id" - ; - - pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - pipeline.run(); - } - - @Test(expected = UnsupportedOperationException.class) - public void testException_crossJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2"; - - pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java deleted file mode 100644 index e226b70..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rel; - -import java.sql.Types; -import java.util.Date; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.TestUtils; -import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.transform.BeamSqlOutputToConsoleFn; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Unbounded + Unbounded Test for {@code BeamJoinRel}. - */ -public class BeamJoinRelUnboundedVsBoundedTest { - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); - public static final Date FIRST_DATE = new Date(1); - public static final Date SECOND_DATE = new Date(1 + 3600 * 1000); - public static final Date THIRD_DATE = new Date(1 + 3600 * 1000 + 3600 * 1000 + 1); - private static final Duration WINDOW_SIZE = Duration.standardHours(1); - - @BeforeClass - public static void prepare() { - beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable - .of( - Types.INTEGER, "order_id", - Types.INTEGER, "site_id", - Types.INTEGER, "price", - Types.TIMESTAMP, "order_time" - ) - .timestampColumnIndex(3) - .addRows( - Duration.ZERO, - 1, 1, 1, FIRST_DATE, - 1, 2, 2, FIRST_DATE - ) - .addRows( - WINDOW_SIZE.plus(Duration.standardSeconds(1)), - 2, 2, 3, SECOND_DATE, - 2, 3, 3, SECOND_DATE, - // this late data is omitted - 1, 2, 3, FIRST_DATE - ) - .addRows( - WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardSeconds(1)), - 3, 3, 3, THIRD_DATE, - // this late data is omitted - 2, 2, 3, SECOND_DATE - ) - ); - - beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBoundedTable - .of(Types.INTEGER, "order_id", - Types.VARCHAR, "buyer" - ).addRows( - 1, "james", - 2, "bond" - )); - } - - @Test - public void testInnerJoin_unboundedTableOnTheLeftSide() throws Exception { - String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " JOIN " - + " ORDER_DETAILS1 o2 " - + " on " - + " o1.order_id=o2.order_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "sum_site_id", - Types.VARCHAR, "buyer" - ).addRows( - 1, 3, "james", - 2, 5, "bond" - ).getStringRows() - ); - pipeline.run(); - } - - @Test - public void testInnerJoin_boundedTableOnTheLeftSide() throws Exception { - String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " - + " ORDER_DETAILS1 o2 " - + " JOIN " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " on " - + " o1.order_id=o2.order_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "sum_site_id", - Types.VARCHAR, "buyer" - ).addRows( - 1, 3, "james", - 2, 5, "bond" - ).getStringRows() - ); - pipeline.run(); - } - - @Test - public void testLeftOuterJoin() throws Exception { - String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " LEFT OUTER JOIN " - + " ORDER_DETAILS1 o2 " - + " on " - + " o1.order_id=o2.order_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld"))); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "sum_site_id", - Types.VARCHAR, "buyer" - ).addRows( - 1, 3, "james", - 2, 5, "bond", - 3, 3, null - ).getStringRows() - ); - pipeline.run(); - } - - @Test(expected = UnsupportedOperationException.class) - public void testLeftOuterJoinError() throws Exception { - String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " - + " ORDER_DETAILS1 o2 " - + " LEFT OUTER JOIN " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " on " - + " o1.order_id=o2.order_id" - ; - pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - pipeline.run(); - } - - @Test - public void testRightOuterJoin() throws Exception { - String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " - + " ORDER_DETAILS1 o2 " - + " RIGHT OUTER JOIN " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " on " - + " o1.order_id=o2.order_id" - ; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "sum_site_id", - Types.VARCHAR, "buyer" - ).addRows( - 1, 3, "james", - 2, 5, "bond", - 3, 3, null - ).getStringRows() - ); - pipeline.run(); - } - - @Test(expected = UnsupportedOperationException.class) - public void testRightOuterJoinError() throws Exception { - String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " RIGHT OUTER JOIN " - + " ORDER_DETAILS1 o2 " - + " on " - + " o1.order_id=o2.order_id" - ; - - pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - pipeline.run(); - } - - @Test(expected = UnsupportedOperationException.class) - public void testFullOuterJoinError() throws Exception { - String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM " - + " ORDER_DETAILS1 o2 " - + " FULL OUTER JOIN " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " on " - + " o1.order_id=o2.order_id" - ; - pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java deleted file mode 100644 index c366a6e..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rel; - -import java.sql.Types; -import java.util.Date; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.TestUtils; -import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.extensions.sql.transform.BeamSqlOutputToConsoleFn; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Unbounded + Unbounded Test for {@code BeamJoinRel}. - */ -public class BeamJoinRelUnboundedVsUnboundedTest { - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv(); - public static final Date FIRST_DATE = new Date(1); - public static final Date SECOND_DATE = new Date(1 + 3600 * 1000); - - private static final Duration WINDOW_SIZE = Duration.standardHours(1); - - @BeforeClass - public static void prepare() { - beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable - .of(Types.INTEGER, "order_id", - Types.INTEGER, "site_id", - Types.INTEGER, "price", - Types.TIMESTAMP, "order_time" - ) - .timestampColumnIndex(3) - .addRows( - Duration.ZERO, - 1, 1, 1, FIRST_DATE, - 1, 2, 6, FIRST_DATE - ) - .addRows( - WINDOW_SIZE.plus(Duration.standardMinutes(1)), - 2, 2, 7, SECOND_DATE, - 2, 3, 8, SECOND_DATE, - // this late record is omitted(First window) - 1, 3, 3, FIRST_DATE - ) - .addRows( - // this late record is omitted(Second window) - WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1)), - 2, 3, 3, SECOND_DATE - ) - ); - } - - @Test - public void testInnerJoin() throws Exception { - String sql = "SELECT * FROM " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " JOIN " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " - + " on " - + " o1.order_id=o2.order_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "sum_site_id", - Types.INTEGER, "order_id0", - Types.INTEGER, "sum_site_id0").addRows( - 1, 3, 1, 3, - 2, 5, 2, 5 - ).getStringRows() - ); - pipeline.run(); - } - - @Test - public void testLeftOuterJoin() throws Exception { - String sql = "SELECT * FROM " - + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " LEFT OUTER JOIN " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " - + " on " - + " o1.order_id=o2.order_id" - ; - - // 1, 1 | 1, 3 - // 2, 2 | NULL, NULL - // ---- | ----- - // 2, 2 | 2, 5 - // 3, 3 | NULL, NULL - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "sum_site_id", - Types.INTEGER, "order_id0", - Types.INTEGER, "sum_site_id0" - ).addRows( - 1, 1, 1, 3, - 2, 2, null, null, - 2, 2, 2, 5, - 3, 3, null, null - ).getStringRows() - ); - pipeline.run(); - } - - @Test - public void testRightOuterJoin() throws Exception { - String sql = "SELECT * FROM " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " RIGHT OUTER JOIN " - + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " - + " on " - + " o1.order_id=o2.order_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id", - Types.INTEGER, "sum_site_id", - Types.INTEGER, "order_id0", - Types.INTEGER, "sum_site_id0" - ).addRows( - 1, 3, 1, 1, - null, null, 2, 2, - 2, 5, 2, 2, - null, null, 3, 3 - ).getStringRows() - ); - pipeline.run(); - } - - @Test - public void testFullOuterJoin() throws Exception { - String sql = "SELECT * FROM " - + "(select price as order_id1, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY price, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " - + " FULL OUTER JOIN " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id , TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " - + " on " - + " o1.order_id1=o2.order_id" - ; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello"))); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "order_id1", - Types.INTEGER, "sum_site_id", - Types.INTEGER, "order_id", - Types.INTEGER, "sum_site_id0" - ).addRows( - 1, 1, 1, 3, - 6, 2, null, null, - 7, 2, null, null, - 8, 3, null, null, - null, null, 2, 5 - ).getStringRows() - ); - pipeline.run(); - } - - @Test(expected = IllegalArgumentException.class) - public void testWindowsMismatch() throws Exception { - String sql = "SELECT * FROM " - + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '2' HOUR)) o1 " - + " LEFT OUTER JOIN " - + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS " - + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 " - + " on " - + " o1.order_id=o2.order_id" - ; - pipeline.enableAbandonedNodeEnforcement(false); - BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRelTest.java deleted file mode 100644 index f2ed132..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRelTest.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rel; - -import java.sql.Types; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.TestUtils; -import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Test for {@code BeamMinusRel}. - */ -public class BeamMinusRelTest { - static BeamSqlEnv sqlEnv = new BeamSqlEnv(); - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - @BeforeClass - public static void prepare() { - sqlEnv.registerTable("ORDER_DETAILS1", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 1L, 1, 1.0, - 2L, 2, 2.0, - 4L, 4, 4.0, - 4L, 4, 4.0 - ) - ); - - sqlEnv.registerTable("ORDER_DETAILS2", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 2L, 2, 2.0, - 3L, 3, 3.0 - ) - ); - } - - @Test - public void testExcept() throws Exception { - String sql = ""; - sql += "SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS1 " - + " EXCEPT " - + "SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS2 "; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 4L, 4, 4.0 - ).getRows()); - - pipeline.run(); - } - - @Test - public void testExceptAll() throws Exception { - String sql = ""; - sql += "SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS1 " - + " EXCEPT ALL " - + "SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS2 "; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).satisfies(new CheckSize(2)); - - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 4L, 4, 4.0, - 4L, 4, 4.0 - ).getRows()); - - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBaseTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBaseTest.java deleted file mode 100644 index 65dd8af2..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBaseTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rel; - -import java.sql.Types; -import java.util.Date; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.TestUtils; -import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Test for {@code BeamSetOperatorRelBase}. - */ -public class BeamSetOperatorRelBaseTest { - static BeamSqlEnv sqlEnv = new BeamSqlEnv(); - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - public static final Date THE_DATE = new Date(100000); - - @BeforeClass - public static void prepare() { - sqlEnv.registerTable("ORDER_DETAILS", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price", - Types.TIMESTAMP, "order_time" - ).addRows( - 1L, 1, 1.0, THE_DATE, - 2L, 2, 2.0, THE_DATE - ) - ); - } - - @Test - public void testSameWindow() throws Exception { - String sql = "SELECT " - + " order_id, site_id, count(*) as cnt " - + "FROM ORDER_DETAILS GROUP BY order_id, site_id" - + ", TUMBLE(order_time, INTERVAL '1' HOUR) " - + " UNION SELECT " - + " order_id, site_id, count(*) as cnt " - + "FROM ORDER_DETAILS GROUP BY order_id, site_id" - + ", TUMBLE(order_time, INTERVAL '1' HOUR) "; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - // compare valueInString to ignore the windowStart & windowEnd - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.BIGINT, "cnt" - ).addRows( - 1L, 1, 1L, - 2L, 2, 1L - ).getStringRows()); - pipeline.run(); - } - - @Test(expected = IllegalArgumentException.class) - public void testDifferentWindows() throws Exception { - String sql = "SELECT " - + " order_id, site_id, count(*) as cnt " - + "FROM ORDER_DETAILS GROUP BY order_id, site_id" - + ", TUMBLE(order_time, INTERVAL '1' HOUR) " - + " UNION SELECT " - + " order_id, site_id, count(*) as cnt " - + "FROM ORDER_DETAILS GROUP BY order_id, site_id" - + ", TUMBLE(order_time, INTERVAL '2' HOUR) "; - - // use a real pipeline rather than the TestPipeline because we are - // testing exceptions, the pipeline will not actually run. - Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create()); - BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv); - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRelTest.java deleted file mode 100644 index 9e38bb6..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRelTest.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rel; - -import java.sql.Types; -import java.util.Date; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.TestUtils; -import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -/** - * Test for {@code BeamSortRel}. - */ -public class BeamSortRelTest { - static BeamSqlEnv sqlEnv = new BeamSqlEnv(); - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - @Before - public void prepare() { - sqlEnv.registerTable("ORDER_DETAILS", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price", - Types.TIMESTAMP, "order_time" - ).addRows( - 1L, 2, 1.0, new Date(), - 1L, 1, 2.0, new Date(), - 2L, 4, 3.0, new Date(), - 2L, 1, 4.0, new Date(), - 5L, 5, 5.0, new Date(), - 6L, 6, 6.0, new Date(), - 7L, 7, 7.0, new Date(), - 8L, 8888, 8.0, new Date(), - 8L, 999, 9.0, new Date(), - 10L, 100, 10.0, new Date() - ) - ); - sqlEnv.registerTable("SUB_ORDER_RAM", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ) - ); - } - - @Test - public void testOrderBy_basic() throws Exception { - String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " - + " order_id, site_id, price " - + "FROM ORDER_DETAILS " - + "ORDER BY order_id asc, site_id desc limit 4"; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 2, 1.0, - 1L, 1, 2.0, - 2L, 4, 3.0, - 2L, 1, 4.0 - ).getRows()); - pipeline.run().waitUntilFinish(); - } - - @Test - public void testOrderBy_nullsFirst() throws Exception { - sqlEnv.registerTable("ORDER_DETAILS", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 2, 1.0, - 1L, null, 2.0, - 2L, 1, 3.0, - 2L, null, 4.0, - 5L, 5, 5.0 - ) - ); - sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable - .of(Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price")); - - String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " - + " order_id, site_id, price " - + "FROM ORDER_DETAILS " - + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4"; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, null, 2.0, - 1L, 2, 1.0, - 2L, null, 4.0, - 2L, 1, 3.0 - ).getRows() - ); - pipeline.run().waitUntilFinish(); - } - - @Test - public void testOrderBy_nullsLast() throws Exception { - sqlEnv.registerTable("ORDER_DETAILS", MockedBoundedTable - .of(Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 2, 1.0, - 1L, null, 2.0, - 2L, 1, 3.0, - 2L, null, 4.0, - 5L, 5, 5.0)); - sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable - .of(Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price")); - - String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " - + " order_id, site_id, price " - + "FROM ORDER_DETAILS " - + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4"; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 2, 1.0, - 1L, null, 2.0, - 2L, 1, 3.0, - 2L, null, 4.0 - ).getRows() - ); - pipeline.run().waitUntilFinish(); - } - - @Test - public void testOrderBy_with_offset() throws Exception { - String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " - + " order_id, site_id, price " - + "FROM ORDER_DETAILS " - + "ORDER BY order_id asc, site_id desc limit 4 offset 4"; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 5L, 5, 5.0, - 6L, 6, 6.0, - 7L, 7, 7.0, - 8L, 8888, 8.0 - ).getRows() - ); - pipeline.run().waitUntilFinish(); - } - - @Test - public void testOrderBy_bigFetch() throws Exception { - String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " - + " order_id, site_id, price " - + "FROM ORDER_DETAILS " - + "ORDER BY order_id asc, site_id desc limit 11"; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 2, 1.0, - 1L, 1, 2.0, - 2L, 4, 3.0, - 2L, 1, 4.0, - 5L, 5, 5.0, - 6L, 6, 6.0, - 7L, 7, 7.0, - 8L, 8888, 8.0, - 8L, 999, 9.0, - 10L, 100, 10.0 - ).getRows() - ); - pipeline.run().waitUntilFinish(); - } - - @Test(expected = UnsupportedOperationException.class) - public void testOrderBy_exception() throws Exception { - String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id) SELECT " - + " order_id, COUNT(*) " - + "FROM ORDER_DETAILS " - + "GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)" - + "ORDER BY order_id asc limit 11"; - - TestPipeline pipeline = TestPipeline.create(); - BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRelTest.java deleted file mode 100644 index 54524df..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRelTest.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rel; - -import java.sql.Types; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.TestUtils; -import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Test for {@code BeamUnionRel}. - */ -public class BeamUnionRelTest { - static BeamSqlEnv sqlEnv = new BeamSqlEnv(); - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - @BeforeClass - public static void prepare() { - sqlEnv.registerTable("ORDER_DETAILS", - MockedBoundedTable.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 2L, 2, 2.0 - ) - ); - } - - @Test - public void testUnion() throws Exception { - String sql = "SELECT " - + " order_id, site_id, price " - + "FROM ORDER_DETAILS " - + " UNION SELECT " - + " order_id, site_id, price " - + "FROM ORDER_DETAILS "; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 2L, 2, 2.0 - ).getRows() - ); - pipeline.run(); - } - - @Test - public void testUnionAll() throws Exception { - String sql = "SELECT " - + " order_id, site_id, price " - + "FROM ORDER_DETAILS" - + " UNION ALL " - + " SELECT order_id, site_id, price " - + "FROM ORDER_DETAILS"; - - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.BIGINT, "order_id", - Types.INTEGER, "site_id", - Types.DOUBLE, "price" - ).addRows( - 1L, 1, 1.0, - 1L, 1, 1.0, - 2L, 2, 2.0, - 2L, 2, 2.0 - ).getRows() - ); - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRelTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRelTest.java deleted file mode 100644 index ace1a3e..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRelTest.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rel; - -import java.sql.Types; -import org.apache.beam.sdk.extensions.sql.BeamSqlCli; -import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.TestUtils; -import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable; -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** - * Test for {@code BeamValuesRel}. - */ -public class BeamValuesRelTest { - static BeamSqlEnv sqlEnv = new BeamSqlEnv(); - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - @BeforeClass - public static void prepare() { - sqlEnv.registerTable("string_table", - MockedBoundedTable.of( - Types.VARCHAR, "name", - Types.VARCHAR, "description" - ) - ); - sqlEnv.registerTable("int_table", - MockedBoundedTable.of( - Types.INTEGER, "c0", - Types.INTEGER, "c1" - ) - ); - } - - @Test - public void testValues() throws Exception { - String sql = "insert into string_table(name, description) values " - + "('hello', 'world'), ('james', 'bond')"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.VARCHAR, "name", - Types.VARCHAR, "description" - ).addRows( - "hello", "world", - "james", "bond" - ).getRows() - ); - pipeline.run(); - } - - @Test - public void testValues_castInt() throws Exception { - String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "c0", - Types.INTEGER, "c1" - ).addRows( - 1, 2 - ).getRows() - ); - pipeline.run(); - } - - @Test - public void testValues_onlySelect() throws Exception { - String sql = "select 1, '1'"; - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - Types.INTEGER, "EXPR$0", - Types.CHAR, "EXPR$1" - ).addRows( - 1, "1" - ).getRows() - ); - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/CheckSize.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/CheckSize.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/CheckSize.java deleted file mode 100644 index f369076..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/rel/CheckSize.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.extensions.sql.rel; - -import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.junit.Assert; - -/** - * Utility class to check size of BeamSQLRow iterable. - */ -public class CheckSize implements SerializableFunction<Iterable<BeamSqlRow>, Void> { - private int size; - public CheckSize(int size) { - this.size = size; - } - @Override public Void apply(Iterable<BeamSqlRow> input) { - int count = 0; - for (BeamSqlRow row : input) { - count++; - } - Assert.assertEquals(size, count); - return null; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java index 553420b..ddff819 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.schema; import java.math.BigDecimal; import java.util.Date; import java.util.GregorianCalendar; -import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.rel.type.RelDataType; http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java index 4eccc44..05af36c 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java @@ -19,10 +19,10 @@ package org.apache.beam.sdk.extensions.sql.schema.kafka; import java.io.Serializable; -import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java index 9dc599f..79e3d6d 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java @@ -31,10 +31,10 @@ import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java index 571c8ef..821abc9 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/transform/BeamAggregationTransformTest.java @@ -23,12 +23,12 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; +import org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.extensions.sql.transform.BeamAggregationTransforms; -import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Combine;