http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java new file mode 100644 index 0000000..be1e317 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlQueryParser.java @@ -0,0 +1,75 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.testutil; + +import org.apache.samza.SamzaException; +import org.apache.samza.sql.testutil.SamzaSqlQueryParser.QueryInfo; +import org.junit.Test; + +import junit.framework.Assert; + +public class TestSamzaSqlQueryParser { + + @Test + public void testParseQuery() { + QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo select * from tracking.bar"); + Assert.assertEquals("log.foo", queryInfo.getSink()); + Assert.assertEquals(queryInfo.getSelectQuery(), "select * from tracking.bar", queryInfo.getSelectQuery()); + Assert.assertEquals(1, queryInfo.getSources().size()); + Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0)); + } + + @Test + public void testParseJoinQuery() { + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " join testavro.PROFILE.`$table` as p" + + " on p.id = pv.profileId"; + QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql); + Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getSink()); + Assert.assertEquals(2, queryInfo.getSources().size()); + Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getSources().get(0)); + Assert.assertEquals("testavro.PROFILE.$table", queryInfo.getSources().get(1)); + } + + @Test + public void testParseInvalidQuery() { + + try { + SamzaSqlQueryParser.parseQuery("select * from tracking.bar"); + Assert.fail("Expected a samzaException"); + } catch (SamzaException e) { + } + + try { + SamzaSqlQueryParser.parseQuery("insert into select * from tracking.bar"); + Assert.fail("Expected a samzaException"); + } catch (SamzaException e) { + } + + try { + SamzaSqlQueryParser.parseQuery("insert into log.off select from tracking.bar"); + Assert.fail("Expected a samzaException"); + } catch (SamzaException e) { + } + } +}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java new file mode 100644 index 0000000..88ce443 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java @@ -0,0 +1,136 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.translator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import org.apache.calcite.DataContext; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.samza.config.Config; +import org.apache.samza.container.TaskContextImpl; +import org.apache.samza.container.TaskName; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.StreamGraphSpec; +import org.apache.samza.operators.functions.FilterFunction; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.StreamOperatorSpec; +import org.apache.samza.sql.data.Expression; +import org.apache.samza.sql.data.RexToJavaCompiler; +import org.apache.samza.sql.data.SamzaSqlExecutionContext; +import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.internal.util.reflection.Whitebox; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link FilterTranslator} + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(LogicalFilter.class) +public class TestFilterTranslator extends TranslatorTestBase { + + @Test + public void testTranslate() throws IOException, ClassNotFoundException { + // setup mock values to the constructor of FilterTranslator + LogicalFilter mockFilter = PowerMockito.mock(LogicalFilter.class); + TranslatorContext mockContext = mock(TranslatorContext.class); + RelNode mockInput = mock(RelNode.class); + when(mockFilter.getInput()).thenReturn(mockInput); + when(mockInput.getId()).thenReturn(1); + when(mockFilter.getId()).thenReturn(2); + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = mock(OperatorSpec.class); + MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp); + when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream); + doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class)); + RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class); + when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler); + Expression mockExpr = mock(Expression.class); + when(mockCompiler.compile(any(), any())).thenReturn(mockExpr); + + // Apply translate() method to verify that we are getting the correct filter operator constructed + FilterTranslator filterTranslator = new FilterTranslator(); + filterTranslator.translate(mockFilter, mockContext); + // make sure that context has been registered with LogicFilter and output message streams + verify(mockContext, times(1)).registerRelNode(2, mockFilter); + verify(mockContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2)); + when(mockContext.getRelNode(2)).thenReturn(mockFilter); + when(mockContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2)); + StreamOperatorSpec filterSpec = (StreamOperatorSpec) Whitebox.getInternalState(this.getRegisteredMessageStream(2), "operatorSpec"); + assertNotNull(filterSpec); + assertEquals(filterSpec.getOpCode(), OperatorSpec.OpCode.FILTER); + + // Verify that the init() method will establish the context for the filter function + Config mockConfig = mock(Config.class); + TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null, + new HashSet<>(), null, null, null, null, null, null); + taskContext.setUserContext(mockContext); + filterSpec.getTransformFn().init(mockConfig, taskContext); + FilterFunction filterFn = (FilterFunction) Whitebox.getInternalState(filterSpec, "filterFn"); + assertNotNull(filterFn); + assertEquals(mockContext, Whitebox.getInternalState(filterFn, "context")); + assertEquals(mockFilter, Whitebox.getInternalState(filterFn, "filter")); + assertEquals(mockExpr, Whitebox.getInternalState(filterFn, "expr")); + + // Calling filterFn.apply() to verify the filter function is correctly applied to the input message + SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>()); + SamzaSqlExecutionContext executionContext = mock(SamzaSqlExecutionContext.class); + DataContext dataContext = mock(DataContext.class); + when(mockContext.getExecutionContext()).thenReturn(executionContext); + when(mockContext.getDataContext()).thenReturn(dataContext); + Object[] result = new Object[1]; + + doAnswer( invocation -> { + Object[] retValue = invocation.getArgumentAt(3, Object[].class); + retValue[0] = new Boolean(true); + return null; + }).when(mockExpr).execute(eq(executionContext), eq(dataContext), + eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()), eq(result)); + assertTrue(filterFn.apply(mockInputMsg)); + + doAnswer( invocation -> { + Object[] retValue = invocation.getArgumentAt(3, Object[].class); + retValue[0] = new Boolean(false); + return null; + }).when(mockExpr).execute(eq(executionContext), eq(dataContext), + eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()), eq(result)); + assertFalse(filterFn.apply(mockInputMsg)); + + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java new file mode 100644 index 0000000..2de4856 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java @@ -0,0 +1,191 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.samza.sql.translator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.calcite.adapter.enumerable.EnumerableTableScan; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.StreamGraphSpec; +import org.apache.samza.operators.TableDescriptor; +import org.apache.samza.operators.functions.StreamTableJoinFunction; +import org.apache.samza.operators.spec.InputOperatorSpec; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.OutputStreamImpl; +import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec; +import org.apache.samza.operators.stream.IntermediateMessageStreamImpl; +import org.apache.samza.serializers.Serde; +import org.apache.samza.sql.data.Expression; +import org.apache.samza.sql.data.RexToJavaCompiler; +import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.apache.samza.sql.interfaces.SqlIOConfig; +import org.apache.samza.sql.interfaces.SqlIOResolver; +import org.apache.samza.storage.kv.RocksDbTableDescriptor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.internal.util.reflection.Whitebox; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link JoinTranslator} + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({LogicalJoin.class, EnumerableTableScan.class}) +public class TestJoinTranslator extends TranslatorTestBase { + + @Test + public void testTranslateStreamToTableJoin() throws IOException, ClassNotFoundException { + // setup mock values to the constructor of FilterTranslator + LogicalJoin mockJoin = PowerMockito.mock(LogicalJoin.class); + TranslatorContext mockContext = mock(TranslatorContext.class); + RelNode mockLeftInput = PowerMockito.mock(EnumerableTableScan.class); + RelNode mockRightInput = mock(RelNode.class); + List<RelNode> inputs = new ArrayList<>(); + inputs.add(mockLeftInput); + inputs.add(mockRightInput); + RelOptTable mockLeftTable = mock(RelOptTable.class); + when(mockLeftInput.getTable()).thenReturn(mockLeftTable); + List<String> qualifiedTableName = new ArrayList<String>() {{ + this.add("test"); + this.add("LeftTable"); + }}; + when(mockLeftTable.getQualifiedName()).thenReturn(qualifiedTableName); + when(mockLeftInput.getId()).thenReturn(1); + when(mockRightInput.getId()).thenReturn(2); + when(mockJoin.getId()).thenReturn(3); + when(mockJoin.getInputs()).thenReturn(inputs); + when(mockJoin.getLeft()).thenReturn(mockLeftInput); + when(mockJoin.getRight()).thenReturn(mockRightInput); + RexCall mockJoinCondition = mock(RexCall.class); + when(mockJoinCondition.isAlwaysTrue()).thenReturn(false); + when(mockJoinCondition.getKind()).thenReturn(SqlKind.EQUALS); + when(mockJoin.getCondition()).thenReturn(mockJoinCondition); + RexInputRef mockLeftConditionInput = mock(RexInputRef.class); + RexInputRef mockRightConditionInput = mock(RexInputRef.class); + when(mockLeftConditionInput.getIndex()).thenReturn(0); + when(mockRightConditionInput.getIndex()).thenReturn(0); + List<RexNode> condOperands = new ArrayList<>(); + condOperands.add(mockLeftConditionInput); + condOperands.add(mockRightConditionInput); + when(mockJoinCondition.getOperands()).thenReturn(condOperands); + RelDataType mockLeftCondDataType = mock(RelDataType.class); + RelDataType mockRightCondDataType = mock(RelDataType.class); + when(mockLeftCondDataType.getSqlTypeName()).thenReturn(SqlTypeName.BOOLEAN); + when(mockRightCondDataType.getSqlTypeName()).thenReturn(SqlTypeName.BOOLEAN); + when(mockLeftConditionInput.getType()).thenReturn(mockLeftCondDataType); + when(mockRightConditionInput.getType()).thenReturn(mockRightCondDataType); + RelDataType mockLeftRowType = mock(RelDataType.class); + when(mockLeftRowType.getFieldCount()).thenReturn(0); //?? why ?? + + when(mockLeftInput.getRowType()).thenReturn(mockLeftRowType); + List<String> leftFieldNames = new ArrayList<String>() {{ + this.add("test_table_field1"); + }}; + List<String> rightStreamFieldNames = new ArrayList<String>() { + { + this.add("test_stream_field1"); + } }; + when(mockLeftRowType.getFieldNames()).thenReturn(leftFieldNames); + RelDataType mockRightRowType = mock(RelDataType.class); + when(mockRightInput.getRowType()).thenReturn(mockRightRowType); + when(mockRightRowType.getFieldNames()).thenReturn(rightStreamFieldNames); + + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + OperatorSpec<Object, SamzaSqlRelMessage> mockLeftInputOp = mock(OperatorSpec.class); + MessageStream<SamzaSqlRelMessage> mockLeftInputStream = new MessageStreamImpl<>(mockGraph, mockLeftInputOp); + when(mockContext.getMessageStream(eq(mockLeftInput.getId()))).thenReturn(mockLeftInputStream); + OperatorSpec<Object, SamzaSqlRelMessage> mockRightInputOp = mock(OperatorSpec.class); + MessageStream<SamzaSqlRelMessage> mockRightInputStream = new MessageStreamImpl<>(mockGraph, mockRightInputOp); + when(mockContext.getMessageStream(eq(mockRightInput.getId()))).thenReturn(mockRightInputStream); + when(mockContext.getStreamGraph()).thenReturn(mockGraph); + + InputOperatorSpec mockInputOp = mock(InputOperatorSpec.class); + OutputStreamImpl mockOutputStream = mock(OutputStreamImpl.class); + when(mockInputOp.isKeyed()).thenReturn(true); + when(mockOutputStream.isKeyed()).thenReturn(true); + IntermediateMessageStreamImpl + mockPartitionedStream = new IntermediateMessageStreamImpl(mockGraph, mockInputOp, mockOutputStream); + when(mockGraph.getIntermediateStream(any(String.class), any(Serde.class))).thenReturn(mockPartitionedStream); + + doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(3), any(MessageStream.class)); + RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class); + when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler); + Expression mockExpr = mock(Expression.class); + when(mockCompiler.compile(any(), any())).thenReturn(mockExpr); + + doAnswer(this.getRegisteredTableAnswer()).when(mockGraph).getTable(any(RocksDbTableDescriptor.class)); + when(mockJoin.getJoinType()).thenReturn(JoinRelType.INNER); + SqlIOResolver mockResolver = mock(SqlIOResolver.class); + SqlIOConfig mockIOConfig = mock(SqlIOConfig.class); + TableDescriptor mockTableDesc = mock(TableDescriptor.class); + when(mockResolver.fetchSourceInfo(String.join(".", qualifiedTableName))).thenReturn(mockIOConfig); + when(mockIOConfig.getTableDescriptor()).thenReturn(Optional.of(mockTableDesc)); + + // Apply translate() method to verify that we are getting the correct map operator constructed + JoinTranslator joinTranslator = new JoinTranslator(3, mockResolver); + joinTranslator.translate(mockJoin, mockContext); + // make sure that context has been registered with LogicFilter and output message streams + verify(mockContext, times(1)).registerMessageStream(3, this.getRegisteredMessageStream(3)); + when(mockContext.getRelNode(3)).thenReturn(mockJoin); + when(mockContext.getMessageStream(3)).thenReturn(this.getRegisteredMessageStream(3)); + StreamTableJoinOperatorSpec + joinSpec = (StreamTableJoinOperatorSpec) Whitebox.getInternalState(this.getRegisteredMessageStream(3), "operatorSpec"); + assertNotNull(joinSpec); + assertEquals(joinSpec.getOpCode(), OperatorSpec.OpCode.JOIN); + + // Verify joinSpec has the corresponding setup + StreamTableJoinFunction joinFn = joinSpec.getJoinFn(); + assertNotNull(joinFn); + assertTrue(Whitebox.getInternalState(joinFn, "isTablePosOnRight").equals(false)); + assertEquals(new ArrayList<Integer>() {{ this.add(0); }}, Whitebox.getInternalState(joinFn, "streamFieldIds")); + assertEquals(leftFieldNames, Whitebox.getInternalState(joinFn, "tableFieldNames")); + List<String> outputFieldNames = new ArrayList<>(); + outputFieldNames.addAll(leftFieldNames); + outputFieldNames.addAll(rightStreamFieldNames); + assertEquals(outputFieldNames, Whitebox.getInternalState(joinFn, "outFieldNames")); + + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java new file mode 100644 index 0000000..f84dd3f --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java @@ -0,0 +1,289 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.samza.sql.translator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import org.apache.calcite.DataContext; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.validate.SqlUserDefinedFunction; +import org.apache.calcite.util.Pair; +import org.apache.samza.config.Config; +import org.apache.samza.container.TaskContextImpl; +import org.apache.samza.container.TaskName; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.StreamGraphSpec; +import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.WatermarkFunction; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.StreamOperatorSpec; +import org.apache.samza.sql.data.Expression; +import org.apache.samza.sql.data.RexToJavaCompiler; +import org.apache.samza.sql.data.SamzaSqlExecutionContext; +import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.internal.util.reflection.Whitebox; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +/** + * Tests for {@link ProjectTranslator} + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(LogicalProject.class) +public class TestProjectTranslator extends TranslatorTestBase { + @Test + public void testTranslate() throws IOException, ClassNotFoundException { + // setup mock values to the constructor of FilterTranslator + LogicalProject mockProject = PowerMockito.mock(LogicalProject.class); + TranslatorContext mockContext = mock(TranslatorContext.class); + RelNode mockInput = mock(RelNode.class); + List<RelNode> inputs = new ArrayList<>(); + inputs.add(mockInput); + when(mockInput.getId()).thenReturn(1); + when(mockProject.getId()).thenReturn(2); + when(mockProject.getInputs()).thenReturn(inputs); + when(mockProject.getInput()).thenReturn(mockInput); + RelDataType mockRowType = mock(RelDataType.class); + when(mockRowType.getFieldCount()).thenReturn(1); + when(mockProject.getRowType()).thenReturn(mockRowType); + RexNode mockRexField = mock(RexNode.class); + List<Pair<RexNode, String>> namedProjects = new ArrayList<>(); + namedProjects.add(Pair.of(mockRexField, "test_field")); + when(mockProject.getNamedProjects()).thenReturn(namedProjects); + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = mock(OperatorSpec.class); + MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp); + when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream); + doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class)); + RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class); + when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler); + Expression mockExpr = mock(Expression.class); + when(mockCompiler.compile(any(), any())).thenReturn(mockExpr); + + // Apply translate() method to verify that we are getting the correct map operator constructed + ProjectTranslator projectTranslator = new ProjectTranslator(); + projectTranslator.translate(mockProject, mockContext); + // make sure that context has been registered with LogicFilter and output message streams + verify(mockContext, times(1)).registerRelNode(2, mockProject); + verify(mockContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2)); + when(mockContext.getRelNode(2)).thenReturn(mockProject); + when(mockContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2)); + StreamOperatorSpec projectSpec = (StreamOperatorSpec) Whitebox.getInternalState(this.getRegisteredMessageStream(2), "operatorSpec"); + assertNotNull(projectSpec); + assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP); + + // Verify that the init() method will establish the context for the map function + Config mockConfig = mock(Config.class); + TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null, + new HashSet<>(), null, null, null, null, null, null); + taskContext.setUserContext(mockContext); + projectSpec.getTransformFn().init(mockConfig, taskContext); + MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec, "mapFn"); + assertNotNull(mapFn); + assertEquals(mockContext, Whitebox.getInternalState(mapFn, "context")); + assertEquals(mockProject, Whitebox.getInternalState(mapFn, "project")); + assertEquals(mockExpr, Whitebox.getInternalState(mapFn, "expr")); + + // Calling mapFn.apply() to verify the filter function is correctly applied to the input message + SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>()); + SamzaSqlExecutionContext executionContext = mock(SamzaSqlExecutionContext.class); + DataContext dataContext = mock(DataContext.class); + when(mockContext.getExecutionContext()).thenReturn(executionContext); + when(mockContext.getDataContext()).thenReturn(dataContext); + Object[] result = new Object[1]; + final Object mockFieldObj = new Object(); + + doAnswer( invocation -> { + Object[] retValue = invocation.getArgumentAt(3, Object[].class); + retValue[0] = mockFieldObj; + return null; + }).when(mockExpr).execute(eq(executionContext), eq(dataContext), + eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()), eq(result)); + SamzaSqlRelMessage retMsg = (SamzaSqlRelMessage) mapFn.apply(mockInputMsg); + assertEquals(retMsg.getSamzaSqlRelRecord().getFieldNames(), + new ArrayList<String>() {{ + this.add("test_field"); + }}); + assertEquals(retMsg.getSamzaSqlRelRecord().getFieldValues(), new ArrayList<Object>() {{ + this.add(mockFieldObj); + }}); + + } + + @Test + public void testTranslateWithFlatten() throws IOException, ClassNotFoundException { + // setup mock values to the constructor of ProjectTranslator + LogicalProject mockProject = PowerMockito.mock(LogicalProject.class); + TranslatorContext mockContext = mock(TranslatorContext.class); + RelNode mockInput = mock(RelNode.class); + List<RelNode> inputs = new ArrayList<>(); + inputs.add(mockInput); + when(mockInput.getId()).thenReturn(1); + when(mockProject.getId()).thenReturn(2); + when(mockProject.getInputs()).thenReturn(inputs); + when(mockProject.getInput()).thenReturn(mockInput); + RelDataType mockRowType = mock(RelDataType.class); + when(mockRowType.getFieldCount()).thenReturn(1); + when(mockProject.getRowType()).thenReturn(mockRowType); + RexNode mockRexField = mock(RexNode.class); + List<Pair<RexNode, String>> namedProjects = new ArrayList<>(); + namedProjects.add(Pair.of(mockRexField, "test_field")); + when(mockProject.getNamedProjects()).thenReturn(namedProjects); + List<RexNode> flattenProjects = new ArrayList<>(); + RexCall mockFlattenProject = mock(RexCall.class); + SqlUserDefinedFunction sqlFlattenUdf = mock(SqlUserDefinedFunction.class); + when(sqlFlattenUdf.getName()).thenReturn("flatten"); + List<RexNode> flattenUdfOperands = new ArrayList<>(); + RexInputRef rexInputRef = mock(RexInputRef.class); + when(rexInputRef.getIndex()).thenReturn(0); + flattenUdfOperands.add(rexInputRef); + when(mockFlattenProject.getOperands()).thenReturn(flattenUdfOperands); + Whitebox.setInternalState(mockFlattenProject, "op", sqlFlattenUdf); + flattenProjects.add(mockFlattenProject); + when(mockProject.getProjects()).thenReturn(flattenProjects); + + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = new OperatorSpec(OperatorSpec.OpCode.INPUT, "1") { + + @Override + public WatermarkFunction getWatermarkFn() { + return null; + } + + @Override + public TimerFunction getTimerFn() { + return null; + } + }; + + MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp); + when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream); + doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class)); + RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class); + when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler); + Expression mockExpr = mock(Expression.class); + when(mockCompiler.compile(any(), any())).thenReturn(mockExpr); + + // Apply translate() method to verify that we are getting the correct map operator constructed + ProjectTranslator projectTranslator = new ProjectTranslator(); + projectTranslator.translate(mockProject, mockContext); + // make sure that context has been registered with LogicFilter and output message streams + verify(mockContext, times(1)).registerRelNode(2, mockProject); + verify(mockContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2)); + when(mockContext.getRelNode(2)).thenReturn(mockProject); + when(mockContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2)); + + + Collection<OperatorSpec> + nextOps = ((OperatorSpec) Whitebox.getInternalState(mockStream, "operatorSpec")).getRegisteredOperatorSpecs(); + StreamOperatorSpec flattenOp = (StreamOperatorSpec) nextOps.iterator().next(); + assertNotNull(flattenOp); + Object testObj = new Object(); + SamzaSqlRelMessage mockMsg = new SamzaSqlRelMessage(new ArrayList<String>() {{ + this.add("test_field_no1"); + }}, new ArrayList<Object>() {{ + this.add(testObj); + }}); + Collection<SamzaSqlRelMessage> flattenedMsgs = flattenOp.getTransformFn().apply(mockMsg); + assertTrue(flattenedMsgs.size() == 1); + assertTrue(flattenedMsgs.stream().anyMatch(s -> s.getSamzaSqlRelRecord().getFieldValues().get(0).equals(testObj))); + List<Integer> testList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + testList.add(new Integer(i)); + } + mockMsg = new SamzaSqlRelMessage(new ArrayList<String>() {{ + this.add("test_list_field1"); + }}, new ArrayList<Object>() {{ + this.add(testList); + }}); + flattenedMsgs = flattenOp.getTransformFn().apply(mockMsg); + assertTrue(flattenedMsgs.size() == 10); + List<Integer> actualList = flattenedMsgs.stream() + .map(m -> ((List<Integer>) m.getSamzaSqlRelRecord().getFieldValues().get(0)).get(0)) + .collect(ArrayList::new, (c, a) -> c.add(a), (c1, c2) -> c1.addAll(c2)); + assertEquals(testList, actualList); + + StreamOperatorSpec projectSpec = (StreamOperatorSpec) Whitebox.getInternalState(this.getRegisteredMessageStream(2), "operatorSpec"); + assertNotNull(projectSpec); + assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP); + + // Verify that the init() method will establish the context for the map function + Config mockConfig = mock(Config.class); + TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null, + new HashSet<>(), null, null, null, null, null, null); + taskContext.setUserContext(mockContext); + projectSpec.getTransformFn().init(mockConfig, taskContext); + MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec, "mapFn"); + assertNotNull(mapFn); + assertEquals(mockContext, Whitebox.getInternalState(mapFn, "context")); + assertEquals(mockProject, Whitebox.getInternalState(mapFn, "project")); + assertEquals(mockExpr, Whitebox.getInternalState(mapFn, "expr")); + + // Calling mapFn.apply() to verify the filter function is correctly applied to the input message + SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>()); + SamzaSqlExecutionContext executionContext = mock(SamzaSqlExecutionContext.class); + DataContext dataContext = mock(DataContext.class); + when(mockContext.getExecutionContext()).thenReturn(executionContext); + when(mockContext.getDataContext()).thenReturn(dataContext); + Object[] result = new Object[1]; + final Object mockFieldObj = new Object(); + + doAnswer( invocation -> { + Object[] retValue = invocation.getArgumentAt(3, Object[].class); + retValue[0] = mockFieldObj; + return null; + }).when(mockExpr).execute(eq(executionContext), eq(dataContext), + eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()), eq(result)); + SamzaSqlRelMessage retMsg = (SamzaSqlRelMessage) mapFn.apply(mockInputMsg); + assertEquals(retMsg.getSamzaSqlRelRecord().getFieldNames(), + new ArrayList<String>() {{ + this.add("test_field"); + }}); + assertEquals(retMsg.getSamzaSqlRelRecord().getFieldValues(), new ArrayList<Object>() {{ + this.add(mockFieldObj); + }}); + + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java new file mode 100644 index 0000000..65b8c8c --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java @@ -0,0 +1,596 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.translator; + +import java.util.HashSet; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.TaskContextImpl; +import org.apache.samza.container.TaskName; +import org.apache.samza.operators.OperatorSpecGraph; +import org.apache.samza.operators.StreamGraphSpec; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.sql.data.SamzaSqlExecutionContext; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory; +import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; +import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; +import org.apache.samza.sql.testutil.SamzaSqlQueryParser; +import org.apache.samza.sql.testutil.SamzaSqlTestConfig; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +public class TestQueryTranslator { + + // Helper functions to validate the cloned copies of TranslatorContext and SamzaSqlExecutionContext + private void validateClonedTranslatorContext(TranslatorContext originContext, TranslatorContext clonedContext) { + Assert.assertNotEquals(originContext, clonedContext); + Assert.assertTrue(originContext.getExpressionCompiler() == clonedContext.getExpressionCompiler()); + Assert.assertTrue(originContext.getStreamGraph() == clonedContext.getStreamGraph()); + Assert.assertTrue(originContext.getExpressionCompiler() == clonedContext.getExpressionCompiler()); + Assert.assertTrue(Whitebox.getInternalState(originContext, "relSamzaConverters") == Whitebox.getInternalState(clonedContext, "relSamzaConverters")); + Assert.assertTrue(Whitebox.getInternalState(originContext, "messsageStreams") == Whitebox.getInternalState(clonedContext, "messsageStreams")); + Assert.assertTrue(Whitebox.getInternalState(originContext, "relNodes") == Whitebox.getInternalState(clonedContext, "relNodes")); + Assert.assertNotEquals(originContext.getDataContext(), clonedContext.getDataContext()); + validateClonedExecutionContext(originContext.getExecutionContext(), clonedContext.getExecutionContext()); + } + + private void validateClonedExecutionContext(SamzaSqlExecutionContext originContext, + SamzaSqlExecutionContext clonedContext) { + Assert.assertNotEquals(originContext, clonedContext); + Assert.assertTrue( + Whitebox.getInternalState(originContext, "sqlConfig") == Whitebox.getInternalState(clonedContext, "sqlConfig")); + Assert.assertTrue(Whitebox.getInternalState(originContext, "udfMetadata") == Whitebox.getInternalState(clonedContext, + "udfMetadata")); + Assert.assertTrue(Whitebox.getInternalState(originContext, "udfInstances") != Whitebox.getInternalState(clonedContext, + "udfInstances")); + } + + private final Map<String, String> configs = new HashMap<>(); + + @Before + public void setUp() { + configs.put("job.default.system", "kafka"); + } + + @Test + public void testTranslate() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, + "Insert into testavro.outputTopic select MyTest(id) from testavro.level1.level2.SIMPLE1 as s where s.id = 10"); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + Assert.assertEquals(1, specGraph.getOutputStreams().size()); + Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("outputTopic", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals(1, specGraph.getInputOperators().size()); + Assert.assertEquals("testavro", + specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("SIMPLE1", + specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); + + validatePerTaskContextInit(graphSpec, samzaConfig); + } + + private void validatePerTaskContextInit(StreamGraphSpec graphSpec, Config samzaConfig) { + // make sure that each task context would have a separate instance of cloned TranslatorContext + TaskContextImpl testContext = new TaskContextImpl(new TaskName("Partition 1"), null, null, + new HashSet<>(), null, null, null, null, null, null); + // call ContextManager.init() to instantiate the per-task TranslatorContext + graphSpec.getContextManager().init(samzaConfig, testContext); + Assert.assertNotNull(testContext.getUserContext()); + Assert.assertTrue(testContext.getUserContext() instanceof TranslatorContext); + TranslatorContext contextPerTaskOne = (TranslatorContext) testContext.getUserContext(); + // call ContextManager.init() second time to instantiate another clone of TranslatorContext + graphSpec.getContextManager().init(samzaConfig, testContext); + Assert.assertTrue(testContext.getUserContext() instanceof TranslatorContext); + // validate the two copies of TranslatorContext are clones of each other + validateClonedTranslatorContext(contextPerTaskOne, (TranslatorContext) testContext.getUserContext()); + } + + @Test + public void testTranslateComplex() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, + "Insert into testavro.outputTopic select Flatten(array_values) from testavro.COMPLEX1"); +// config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, +// "Insert into testavro.foo2 select string_value, SUM(id) from testavro.COMPLEX1 " +// + "GROUP BY TumbleWindow(CURRENT_TIME, INTERVAL '1' HOUR), string_value"); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + Assert.assertEquals(1, specGraph.getOutputStreams().size()); + Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("outputTopic", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals(1, specGraph.getInputOperators().size()); + Assert.assertEquals("testavro", + specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("COMPLEX1", + specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); + + validatePerTaskContextInit(graphSpec, samzaConfig); + } + + @Test + public void testTranslateSubQuery() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, + "Insert into testavro.outputTopic select Flatten(a), id from (select id, array_values a, string_value s from testavro.COMPLEX1)"); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + Assert.assertEquals(1, specGraph.getOutputStreams().size()); + Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("outputTopic", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals(1, specGraph.getInputOperators().size()); + Assert.assertEquals("testavro", + specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("COMPLEX1", + specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); + + validatePerTaskContextInit(graphSpec, samzaConfig); + } + + @Test (expected = SamzaException.class) + public void testTranslateStreamTableJoinWithoutJoinOperator() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p" + + " where p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + } + + @Test (expected = SamzaException.class) + public void testTranslateStreamTableJoinWithFullJoinOperator() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " full join testavro.PROFILE.`$table` as p" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + } + + @Test (expected = IllegalStateException.class) + public void testTranslateStreamTableJoinWithSelfJoinOperator() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p1.name as profileName" + + " from testavro.PROFILE.`$table` as p1" + + " join testavro.PROFILE.`$table` as p2" + + " on p1.id = p2.id"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + } + + @Test (expected = SamzaException.class) + public void testTranslateStreamTableJoinWithThetaCondition() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " join testavro.PROFILE.`$table` as p" + + " on p.id <> pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + } + + @Test (expected = SamzaException.class) + public void testTranslateStreamTableCrossJoin() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + } + + @Test (expected = SamzaException.class) + public void testTranslateStreamTableJoinWithAndLiteralCondition() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " join testavro.PROFILE.`$table` as p" + + " on p.id = pv.profileId and p.name = 'John'"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + } + + @Test (expected = SamzaException.class) + public void testTranslateStreamTableJoinWithSubQuery() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " where exists " + + " (select p.id from testavro.PROFILE.`$table` as p" + + " where p.id = pv.profileId)"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + } + + @Test (expected = SamzaException.class) + public void testTranslateTableTableJoin() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW.`$table` as pv" + + " join testavro.PROFILE.`$table` as p" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + } + + @Test (expected = SamzaException.class) + public void testTranslateStreamStreamJoin() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " join testavro.PROFILE as p" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + } + + @Test (expected = SamzaException.class) + public void testTranslateJoinWithIncorrectLeftJoin() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW.`$table` as pv" + + " left join testavro.PROFILE as p" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + } + + @Test (expected = SamzaException.class) + public void testTranslateJoinWithIncorrectRightJoin() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " right join testavro.PROFILE.`$table` as p" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + } + + @Test (expected = SamzaException.class) + public void testTranslateStreamTableInnerJoinWithMissingStream() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); + String configIOResolverDomain = + String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config"); + config.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY, + ConfigBasedIOResolverFactory.class.getName()); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " join testavro.`$table` as p" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + } + + @Test (expected = SamzaException.class) + public void testTranslateStreamTableInnerJoinWithUdf() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " join testavro.PROFILE.`$table` as p" + + " on MyTest(p.id) = MyTest(pv.profileId)"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + } + + @Test + public void testTranslateStreamTableInnerJoin() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " join testavro.PROFILE.`$table` as p" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + + Assert.assertEquals(2, specGraph.getOutputStreams().size()); + Assert.assertEquals("kafka", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("sql-job-1-partition_by-stream_1", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName()); + Assert.assertEquals("enrichedPageViewTopic", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName()); + + Assert.assertEquals(3, specGraph.getInputOperators().size()); + Assert.assertEquals("testavro", + specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("PAGEVIEW", + specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", + specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName()); + Assert.assertEquals("PROFILE", + specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName()); + Assert.assertEquals("kafka", + specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName()); + Assert.assertEquals("sql-job-1-partition_by-stream_1", + specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName()); + + validatePerTaskContextInit(graphSpec, samzaConfig); + } + + @Test + public void testTranslateStreamTableLeftJoin() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " left join testavro.PROFILE.`$table` as p" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + + OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + + Assert.assertEquals(2, specGraph.getOutputStreams().size()); + Assert.assertEquals("kafka", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("sql-job-1-partition_by-stream_1", + specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName()); + Assert.assertEquals("enrichedPageViewTopic", + specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName()); + + Assert.assertEquals(3, specGraph.getInputOperators().size()); + Assert.assertEquals("testavro", + specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("PAGEVIEW", + specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", + specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName()); + Assert.assertEquals("PROFILE", + specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName()); + Assert.assertEquals("kafka", + specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName()); + Assert.assertEquals("sql-job-1-partition_by-stream_1", + specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName()); + + validatePerTaskContextInit(graphSpec, samzaConfig); + } + + @Test + public void testTranslateStreamTableRightJoin() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PROFILE.`$table` as p" + + " right join testavro.PAGEVIEW as pv" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + + OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + + Assert.assertEquals(2, specGraph.getOutputStreams().size()); + Assert.assertEquals("kafka", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("sql-job-1-partition_by-stream_1", + specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName()); + Assert.assertEquals("enrichedPageViewTopic", + specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName()); + + Assert.assertEquals(3, specGraph.getInputOperators().size()); + Assert.assertEquals("testavro", + specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("PROFILE", + specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", + specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName()); + Assert.assertEquals("PAGEVIEW", + specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName()); + Assert.assertEquals("kafka", + specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName()); + Assert.assertEquals("sql-job-1-partition_by-stream_1", + specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName()); + + validatePerTaskContextInit(graphSpec, samzaConfig); + } + + @Test + public void testTranslateGroupBy() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); + String sql = + "Insert into testavro.pageViewCountTopic" + + " select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`" + + " from testavro.PAGEVIEW as pv" + + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'" + + " group by (pv.pageKey)"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + + Assert.assertEquals(1, specGraph.getInputOperators().size()); + Assert.assertEquals(1, specGraph.getOutputStreams().size()); + Assert.assertTrue(specGraph.hasWindowOrJoins()); + Collection<OperatorSpec> operatorSpecs = specGraph.getAllOperatorSpecs(); + } + + @Test (expected = SamzaException.class) + public void testTranslateGroupByWithSumAggregator() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); + String sql = + "Insert into testavro.pageViewCountTopic" + + " select 'SampleJob' as jobName, pv.pageKey, sum(pv.profileId) as `sum`" + + " from testavro.PAGEVIEW as pv" + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'" + + " group by (pv.pageKey)"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphSpec + graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, graphSpec); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java new file mode 100644 index 0000000..5dd2d21 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java @@ -0,0 +1,118 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.translator; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.samza.operators.KV; +import org.apache.samza.sql.data.SamzaSqlCompositeKey; +import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.junit.Assert; +import org.junit.Test; + + +public class TestSamzaSqlRelMessageJoinFunction { + + private List<String> streamFieldNames = Arrays.asList("field1", "field2", "field3", "field4"); + private List<Object> streamFieldValues = Arrays.asList("value1", 1, null, "value4"); + private List<String> tableFieldNames = Arrays.asList("field11", "field12", "field13", "field14"); + private List<Object> tableFieldValues = Arrays.asList("value1", 1, null, "value5"); + + @Test + public void testWithInnerJoinWithTableOnRight() { + SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues); + SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues); + JoinRelType joinRelType = JoinRelType.INNER; + List<Integer> streamKeyIds = Arrays.asList(0, 1); + List<Integer> tableKeyIds = Arrays.asList(0, 1); + SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds); + KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg); + + SamzaSqlRelMessageJoinFunction joinFn = + new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames); + SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record); + + Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(), + outMsg.getSamzaSqlRelRecord().getFieldNames().size()); + List<String> expectedFieldNames = new ArrayList<>(streamFieldNames); + expectedFieldNames.addAll(tableFieldNames); + List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues); + expectedFieldValues.addAll(tableFieldValues); + Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues); + } + + @Test + public void testWithInnerJoinWithTableOnLeft() { + SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues); + SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues); + JoinRelType joinRelType = JoinRelType.INNER; + List<Integer> streamKeyIds = Arrays.asList(0, 2); + List<Integer> tableKeyIds = Arrays.asList(0, 2); + SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds); + KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg); + + SamzaSqlRelMessageJoinFunction joinFn = + new SamzaSqlRelMessageJoinFunction(joinRelType, false, streamKeyIds, streamFieldNames, tableFieldNames); + SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record); + + Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(), + outMsg.getSamzaSqlRelRecord().getFieldNames().size()); + List<String> expectedFieldNames = new ArrayList<>(tableFieldNames); + expectedFieldNames.addAll(streamFieldNames); + List<Object> expectedFieldValues = new ArrayList<>(tableFieldValues); + expectedFieldValues.addAll(streamFieldValues); + Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues); + } + + @Test + public void testNullRecordWithInnerJoin() { + SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues); + JoinRelType joinRelType = JoinRelType.INNER; + List<Integer> streamKeyIds = Arrays.asList(0, 1); + + SamzaSqlRelMessageJoinFunction joinFn = + new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames); + SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null); + Assert.assertNull(outMsg); + } + + @Test + public void testNullRecordWithLeftOuterJoin() { + SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues); + JoinRelType joinRelType = JoinRelType.LEFT; + List<Integer> streamKeyIds = Arrays.asList(0, 1); + + SamzaSqlRelMessageJoinFunction joinFn = + new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, + tableFieldNames); + SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null); + + Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(), + outMsg.getSamzaSqlRelRecord().getFieldNames().size()); + List<String> expectedFieldNames = new ArrayList<>(streamFieldNames); + expectedFieldNames.addAll(tableFieldNames); + List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues); + expectedFieldValues.addAll(tableFieldNames.stream().map( name -> null ).collect(Collectors.toList())); + Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java new file mode 100644 index 0000000..a74993f --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TranslatorTestBase.java @@ -0,0 +1,72 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.translator; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.TableDescriptor; +import org.apache.samza.operators.TableImpl; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.apache.samza.storage.kv.RocksDbTableProvider; +import org.apache.samza.table.Table; +import org.apache.samza.table.TableSpec; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import static org.mockito.Mockito.*; + + +/** + * Base class for all unit tests for translators + */ +public class TranslatorTestBase { + Map<Integer, MessageStream> registeredStreams = new HashMap<>(); + Map<String, TableImpl> registeredTables = new HashMap<>(); + + Answer getRegisterMessageStreamAnswer() { + return (InvocationOnMock x) -> { + Integer id = x.getArgumentAt(0, Integer.class); + MessageStream stream = x.getArgumentAt(1, MessageStream.class); + registeredStreams.put(id, stream); + return null; + }; + } + + Answer getRegisteredTableAnswer() { + return (InvocationOnMock x) -> { + TableDescriptor descriptor = x.getArgumentAt(0, TableDescriptor.class); + TableSpec mockTableSpec = new TableSpec(descriptor.getTableId(), KVSerde.of(new StringSerde(), + new JsonSerdeV2<SamzaSqlRelMessage>()), RocksDbTableProvider.class.getCanonicalName(), new HashMap<>()); + TableImpl mockTable = mock(TableImpl.class); + when(mockTable.getTableSpec()).thenReturn(mockTableSpec); + this.registeredTables.putIfAbsent(descriptor.getTableId(), mockTable); + return this.registeredTables.get(descriptor.getTableId()); + }; + } + + MessageStream getRegisteredMessageStream(int id) { + return this.registeredStreams.get(id); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java new file mode 100644 index 0000000..c029eb4 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import java.time.Duration; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.triggers.Triggers; +import org.apache.samza.operators.windows.AccumulationMode; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.util.CommandLine; + + +/** + * Example code to implement window-based counter + */ +public class AppWithGlobalConfigExample implements StreamApplication { + + // local execution mode + public static void main(String[] args) { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + LocalApplicationRunner runner = new LocalApplicationRunner(config); + AppWithGlobalConfigExample app = new AppWithGlobalConfigExample(); + runner.run(app); + runner.waitForFinish(); + } + + @Override + public void init(StreamGraph graph, Config config) { + graph.getInputStream("myPageViewEevent", KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class))) + .map(KV::getValue) + .window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), () -> 0, (m, c) -> c + 1, null, null) + .setEarlyTrigger(Triggers.repeat(Triggers.count(5))) + .setAccumulationMode(AccumulationMode.DISCARDING), "w1") + .map(m -> KV.of(m.getKey().getKey(), new PageViewCount(m))) + .sendTo(graph.getOutputStream("pageViewEventPerMemberStream", KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewCount.class)))); + } + + class PageViewEvent { + String pageId; + String memberId; + long timestamp; + + PageViewEvent(String pageId, String memberId, long timestamp) { + this.pageId = pageId; + this.memberId = memberId; + this.timestamp = timestamp; + } + } + + static class PageViewCount { + String memberId; + long timestamp; + int count; + + PageViewCount(WindowPane<String, Integer> m) { + this.memberId = m.getKey().getKey(); + this.timestamp = Long.valueOf(m.getKey().getPaneId()); + this.count = m.getMessage(); + } + } +}
