http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java index a0bd45f..8e6f687 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java @@ -31,9 +31,12 @@ import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.validate.SqlUserDefinedFunction; import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.sql.data.Expression; import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.apache.samza.task.TaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,37 +49,37 @@ class ProjectTranslator { private static final Logger LOG = LoggerFactory.getLogger(ProjectTranslator.class); - void translate(final Project project, final TranslatorContext context) { - MessageStream<SamzaSqlRelMessage> messageStream = context.getMessageStream(project.getInput().getId()); - List<Integer> flattenProjects = - project.getProjects().stream().filter(this::isFlatten).map(this::getProjectIndex).collect(Collectors.toList()); + private static class ProjectMapFunction implements MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> { + private transient Project project; + private transient Expression expr; + private transient TranslatorContext context; - if (flattenProjects.size() > 0) { - if (flattenProjects.size() > 1) { - String msg = "Multiple flatten operators in a single query is not supported"; - LOG.error(msg); - throw new SamzaException(msg); - } + private final int projectId; - messageStream = translateFlatten(flattenProjects.get(0), messageStream); + ProjectMapFunction(int projectId) { + this.projectId = projectId; } - Expression expr = context.getExpressionCompiler().compile(project.getInputs(), project.getProjects()); + @Override + public void init(Config config, TaskContext taskContext) { + this.context = (TranslatorContext) taskContext.getUserContext(); + this.project = (Project) this.context.getRelNode(projectId); + this.expr = this.context.getExpressionCompiler().compile(project.getInputs(), project.getProjects()); + } - MessageStream<SamzaSqlRelMessage> outputStream = messageStream.map(m -> { + @Override + public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) { RelDataType type = project.getRowType(); Object[] output = new Object[type.getFieldCount()]; expr.execute(context.getExecutionContext(), context.getDataContext(), - m.getSamzaSqlRelRecord().getFieldValues().toArray(), output); + message.getSamzaSqlRelRecord().getFieldValues().toArray(), output); List<String> names = new ArrayList<>(); for (int index = 0; index < output.length; index++) { names.add(index, project.getNamedProjects().get(index).getValue()); } return new SamzaSqlRelMessage(names, Arrays.asList(output)); - }); - - context.registerMessageStream(project.getId(), outputStream); + } } private MessageStream<SamzaSqlRelMessage> translateFlatten(Integer flattenIndex, @@ -106,4 +109,27 @@ class ProjectTranslator { private Integer getProjectIndex(RexNode rexNode) { return ((RexInputRef) ((RexCall) rexNode).getOperands().get(0)).getIndex(); } + + void translate(final Project project, final TranslatorContext context) { + MessageStream<SamzaSqlRelMessage> messageStream = context.getMessageStream(project.getInput().getId()); + List<Integer> flattenProjects = + project.getProjects().stream().filter(this::isFlatten).map(this::getProjectIndex).collect(Collectors.toList()); + + if (flattenProjects.size() > 0) { + if (flattenProjects.size() > 1) { + String msg = "Multiple flatten operators in a single query is not supported"; + LOG.error(msg); + throw new SamzaException(msg); + } + messageStream = translateFlatten(flattenProjects.get(0), messageStream); + } + + final int projectId = project.getId(); + + MessageStream<SamzaSqlRelMessage> outputStream = messageStream.map(new ProjectMapFunction(projectId)); + + context.registerMessageStream(project.getId(), outputStream); + context.registerRelNode(project.getId(), project); + } + }
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java index eda73a7..1db3000 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java @@ -19,6 +19,7 @@ package org.apache.samza.sql.translator; +import java.util.Map; import java.util.Optional; import org.apache.calcite.rel.RelNode; @@ -29,11 +30,14 @@ import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.samza.config.Config; +import org.apache.samza.operators.ContextManager; import org.apache.samza.SamzaException; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.TableDescriptor; import org.apache.samza.sql.data.SamzaSqlExecutionContext; import org.apache.samza.sql.data.SamzaSqlRelMessage; @@ -43,6 +47,7 @@ import org.apache.samza.sql.interfaces.SqlIOConfig; import org.apache.samza.sql.planner.QueryPlanner; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; import org.apache.samza.sql.testutil.SamzaSqlQueryParser; +import org.apache.samza.task.TaskContext; import org.apache.samza.table.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,11 +63,33 @@ public class QueryTranslator { private final ScanTranslator scanTranslator; private final SamzaSqlApplicationConfig sqlConfig; + private final Map<String, SamzaRelConverter> converters; + + private static class OutputMapFunction implements MapFunction<SamzaSqlRelMessage, KV<Object, Object>> { + private transient SamzaRelConverter samzaMsgConverter; + private final String outputTopic; + + OutputMapFunction(String outputTopic) { + this.outputTopic = outputTopic; + } + + @Override + public void init(Config config, TaskContext taskContext) { + TranslatorContext context = (TranslatorContext) taskContext.getUserContext(); + this.samzaMsgConverter = context.getMsgConverter(outputTopic); + } + + @Override + public KV<Object, Object> apply(SamzaSqlRelMessage message) { + return this.samzaMsgConverter.convertToSamzaMessage(message); + } + } public QueryTranslator(SamzaSqlApplicationConfig sqlConfig) { this.sqlConfig = sqlConfig; scanTranslator = new ScanTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getInputSystemStreamConfigBySource()); + this.converters = sqlConfig.getSamzaRelConverters(); } public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamGraph streamGraph) { @@ -71,7 +98,7 @@ public class QueryTranslator { sqlConfig.getUdfMetadata()); final SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(this.sqlConfig); final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery()); - final TranslatorContext context = new TranslatorContext(streamGraph, relRoot, executionContext); + final TranslatorContext context = new TranslatorContext(streamGraph, relRoot, executionContext, this.converters); final RelNode node = relRoot.project(); final SqlIOResolver ioResolver = context.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver(); @@ -119,9 +146,8 @@ public class QueryTranslator { String sink = queryInfo.getSink(); SqlIOConfig sinkConfig = sqlConfig.getOutputSystemStreamConfigsBySource().get(sink); - SamzaRelConverter samzaMsgConverter = sqlConfig.getSamzaRelConverters().get(queryInfo.getSink()); MessageStreamImpl<SamzaSqlRelMessage> stream = (MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(node.getId()); - MessageStream<KV<Object, Object>> outputStream = stream.map(samzaMsgConverter::convertToSamzaMessage); + MessageStream<KV<Object, Object>> outputStream = stream.map(new OutputMapFunction(sink)); Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor(); if (!tableDescriptor.isPresent()) { @@ -135,5 +161,19 @@ public class QueryTranslator { } outputStream.sendTo(outputTable); } + + streamGraph.withContextManager(new ContextManager() { + @Override + public void init(Config config, TaskContext taskContext) { + taskContext.setUserContext(context.clone()); + } + + @Override + public void close() { + + } + + }); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java index df88a7c..889ea97 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java @@ -44,20 +44,20 @@ public class SamzaSqlRelMessageJoinFunction private final JoinRelType joinRelType; private final boolean isTablePosOnRight; - private final List<Integer> streamFieldIds; + private final ArrayList<Integer> streamFieldIds; // Table field names are used in the outer join when the table record is not found. - private final List<String> tableFieldNames; - private final List<String> outFieldNames; + private final ArrayList<String> tableFieldNames; + private final ArrayList<String> outFieldNames; - public SamzaSqlRelMessageJoinFunction(JoinRelType joinRelType, boolean isTablePosOnRight, + SamzaSqlRelMessageJoinFunction(JoinRelType joinRelType, boolean isTablePosOnRight, List<Integer> streamFieldIds, List<String> streamFieldNames, List<String> tableFieldNames) { this.joinRelType = joinRelType; this.isTablePosOnRight = isTablePosOnRight; Validate.isTrue((joinRelType.compareTo(JoinRelType.LEFT) == 0 && isTablePosOnRight) || (joinRelType.compareTo(JoinRelType.RIGHT) == 0 && !isTablePosOnRight) || joinRelType.compareTo(JoinRelType.INNER) == 0); - this.streamFieldIds = streamFieldIds; - this.tableFieldNames = tableFieldNames; + this.streamFieldIds = new ArrayList<>(streamFieldIds); + this.tableFieldNames = new ArrayList<>(tableFieldNames); this.outFieldNames = new ArrayList<>(); if (isTablePosOnRight) { outFieldNames.addAll(streamFieldNames); http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java index 1f9ed52..fa3d9d3 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java @@ -23,11 +23,14 @@ import java.util.List; import java.util.Map; import org.apache.calcite.rel.core.TableScan; import org.apache.commons.lang.Validate; +import org.apache.samza.config.Config; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SamzaRelConverter; +import org.apache.samza.task.TaskContext; import org.apache.samza.sql.interfaces.SqlIOConfig; @@ -45,17 +48,36 @@ class ScanTranslator { this.systemStreamConfig = ssc; } + private static class ScanMapFunction implements MapFunction<KV<Object, Object>, SamzaSqlRelMessage> { + private transient SamzaRelConverter msgConverter; + private final String streamName; + + ScanMapFunction(String sourceStreamName) { + this.streamName = sourceStreamName; + } + + @Override + public void init(Config config, TaskContext taskContext) { + TranslatorContext context = (TranslatorContext) taskContext.getUserContext(); + this.msgConverter = context.getMsgConverter(streamName); + } + + @Override + public SamzaSqlRelMessage apply(KV<Object, Object> message) { + return this.msgConverter.convertToRelMessage(message); + } + } + void translate(final TableScan tableScan, final TranslatorContext context) { StreamGraph streamGraph = context.getStreamGraph(); List<String> tableNameParts = tableScan.getTable().getQualifiedName(); String sourceName = SqlIOConfig.getSourceFromSourceParts(tableNameParts); Validate.isTrue(relMsgConverters.containsKey(sourceName), String.format("Unknown source %s", sourceName)); - SamzaRelConverter converter = relMsgConverters.get(sourceName); - String streamName = systemStreamConfig.get(sourceName).getStreamName(); + final String streamName = systemStreamConfig.get(sourceName).getStreamName(); MessageStream<KV<Object, Object>> inputStream = streamGraph.getInputStream(streamName); - MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputStream.map(converter::convertToRelMessage); + MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputStream.map(new ScanMapFunction(sourceName)); context.registerMessageStream(tableScan.getId(), samzaSqlRelMessageStream); } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java index fd5195b..7a25efb 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.linq4j.QueryProvider; +import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -35,16 +36,25 @@ import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.StreamGraph; import org.apache.samza.sql.data.RexToJavaCompiler; import org.apache.samza.sql.data.SamzaSqlExecutionContext; +import org.apache.samza.sql.interfaces.SamzaRelConverter; /** * State that is maintained while translating the Calcite relational graph to Samza {@link StreamGraph}. */ -public class TranslatorContext { +public class TranslatorContext implements Cloneable { + /** + * The internal variables that are shared among all cloned {@link TranslatorContext} + */ private final StreamGraph streamGraph; - private final Map<Integer, MessageStream> messsageStreams = new HashMap<>(); private final RexToJavaCompiler compiler; + private final Map<String, SamzaRelConverter> relSamzaConverters; + private final Map<Integer, MessageStream> messsageStreams; + private final Map<Integer, RelNode> relNodes; + /** + * The internal variables that are not shared among all cloned {@link TranslatorContext} + */ private final SamzaSqlExecutionContext executionContext; private final DataContextImpl dataContext; @@ -90,17 +100,42 @@ public class TranslatorContext { } } + private RexToJavaCompiler createExpressionCompiler(RelRoot relRoot) { + RelDataTypeFactory dataTypeFactory = relRoot.project().getCluster().getTypeFactory(); + RexBuilder rexBuilder = new SamzaSqlRexBuilder(dataTypeFactory); + return new RexToJavaCompiler(rexBuilder); + } + + /** + * Private constructor to make a clone of {@link TranslatorContext} object + * + * @param other the original object to copy from + */ + private TranslatorContext(TranslatorContext other) { + this.streamGraph = other.streamGraph; + this.compiler = other.compiler; + this.relSamzaConverters = other.relSamzaConverters; + this.messsageStreams = other.messsageStreams; + this.relNodes = other.relNodes; + this.executionContext = other.executionContext.clone(); + this.dataContext = new DataContextImpl(); + } + /** * Create the instance of TranslatorContext * @param streamGraph Samza's streamGraph that is populated during the translation. * @param relRoot Root of the relational graph from calcite. * @param executionContext the execution context + * @param converters the map of schema to RelData converters */ - public TranslatorContext(StreamGraph streamGraph, RelRoot relRoot, SamzaSqlExecutionContext executionContext) { + TranslatorContext(StreamGraph streamGraph, RelRoot relRoot, SamzaSqlExecutionContext executionContext, Map<String, SamzaRelConverter> converters) { this.streamGraph = streamGraph; this.compiler = createExpressionCompiler(relRoot); this.executionContext = executionContext; this.dataContext = new DataContextImpl(); + this.relSamzaConverters = converters; + this.messsageStreams = new HashMap<>(); + this.relNodes = new HashMap<>(); } /** @@ -112,22 +147,16 @@ public class TranslatorContext { return streamGraph; } - private RexToJavaCompiler createExpressionCompiler(RelRoot relRoot) { - RelDataTypeFactory dataTypeFactory = relRoot.project().getCluster().getTypeFactory(); - RexBuilder rexBuilder = new SamzaSqlRexBuilder(dataTypeFactory); - return new RexToJavaCompiler(rexBuilder); - } - /** * Gets execution context. * * @return the execution context */ - public SamzaSqlExecutionContext getExecutionContext() { + SamzaSqlExecutionContext getExecutionContext() { return executionContext; } - public DataContext getDataContext() { + DataContext getDataContext() { return dataContext; } @@ -136,7 +165,7 @@ public class TranslatorContext { * * @return the expression compiler */ - public RexToJavaCompiler getExpressionCompiler() { + RexToJavaCompiler getExpressionCompiler() { return compiler; } @@ -146,7 +175,7 @@ public class TranslatorContext { * @param id the id * @param stream the stream */ - public void registerMessageStream(int id, MessageStream stream) { + void registerMessageStream(int id, MessageStream stream) { messsageStreams.put(id, stream); } @@ -156,7 +185,29 @@ public class TranslatorContext { * @param id the id * @return the message stream */ - public MessageStream getMessageStream(int id) { + MessageStream getMessageStream(int id) { return messsageStreams.get(id); } + + void registerRelNode(int id, RelNode relNode) { + relNodes.put(id, relNode); + } + + RelNode getRelNode(int id) { + return relNodes.get(id); + } + + SamzaRelConverter getMsgConverter(String source) { + return this.relSamzaConverters.get(source); + } + + /** + * This method helps to create a per task instance of translator context + * + * @return the cloned instance of {@link TranslatorContext} + */ + @Override + public TranslatorContext clone() { + return new TranslatorContext(this); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java deleted file mode 100644 index de0ecf1..0000000 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java +++ /dev/null @@ -1,510 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.samza.sql; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.config.MapConfig; -import org.apache.samza.operators.StreamGraphImpl; -import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.runtime.LocalApplicationRunner; -import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory; -import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; -import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; -import org.apache.samza.sql.testutil.SamzaSqlQueryParser; -import org.apache.samza.sql.testutil.SamzaSqlTestConfig; -import org.apache.samza.sql.translator.QueryTranslator; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - - -public class TestQueryTranslator { - - private final Map<String, String> configs = new HashMap<>(); - - @Before - public void setUp() { - configs.put("job.default.system", "kafka"); - } - - @Test - public void testTranslate() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, - "Insert into testavro.outputTopic select MyTest(id) from testavro.level1.level2.SIMPLE1 as s where s.id = 10"); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - Assert.assertEquals(1, streamGraph.getOutputStreams().size()); - Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("outputTopic", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); - Assert.assertEquals(1, streamGraph.getInputOperators().size()); - Assert.assertEquals("testavro", - streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("SIMPLE1", - streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); - } - - @Test - public void testTranslateComplex() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, - "Insert into testavro.outputTopic select Flatten(array_values) from testavro.COMPLEX1"); -// config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, -// "Insert into testavro.foo2 select string_value, SUM(id) from testavro.COMPLEX1 " -// + "GROUP BY TumbleWindow(CURRENT_TIME, INTERVAL '1' HOUR), string_value"); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - Assert.assertEquals(1, streamGraph.getOutputStreams().size()); - Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("outputTopic", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); - Assert.assertEquals(1, streamGraph.getInputOperators().size()); - Assert.assertEquals("testavro", - streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("COMPLEX1", - streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); - } - - @Test - public void testTranslateSubQuery() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, - "Insert into testavro.outputTopic select Flatten(a), id from (select id, array_values a, string_value s from testavro.COMPLEX1)"); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - Assert.assertEquals(1, streamGraph.getOutputStreams().size()); - Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("outputTopic", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); - Assert.assertEquals(1, streamGraph.getInputOperators().size()); - Assert.assertEquals("testavro", - streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("COMPLEX1", - streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); - } - - @Test (expected = SamzaException.class) - public void testTranslateStreamTableJoinWithoutJoinOperator() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); - String sql = - "Insert into testavro.enrichedPageViewTopic" - + " select p.name as profileName, pv.pageKey" - + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p" - + " where p.id = pv.profileId"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - } - - @Test (expected = SamzaException.class) - public void testTranslateStreamTableJoinWithFullJoinOperator() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); - String sql = - "Insert into testavro.enrichedPageViewTopic" - + " select p.name as profileName, pv.pageKey" - + " from testavro.PAGEVIEW as pv" - + " full join testavro.PROFILE.`$table` as p" - + " on p.id = pv.profileId"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - } - - @Test (expected = IllegalStateException.class) - public void testTranslateStreamTableJoinWithSelfJoinOperator() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); - String sql = - "Insert into testavro.enrichedPageViewTopic" - + " select p1.name as profileName" - + " from testavro.PROFILE.`$table` as p1" - + " join testavro.PROFILE.`$table` as p2" - + " on p1.id = p2.id"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - } - - @Test (expected = SamzaException.class) - public void testTranslateStreamTableJoinWithThetaCondition() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); - String sql = - "Insert into testavro.enrichedPageViewTopic" - + " select p.name as profileName, pv.pageKey" - + " from testavro.PAGEVIEW as pv" - + " join testavro.PROFILE.`$table` as p" - + " on p.id <> pv.profileId"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - } - - @Test (expected = SamzaException.class) - public void testTranslateStreamTableCrossJoin() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); - String sql = - "Insert into testavro.enrichedPageViewTopic" - + " select p.name as profileName, pv.pageKey" - + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - } - - @Test (expected = SamzaException.class) - public void testTranslateStreamTableJoinWithAndLiteralCondition() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); - String sql = - "Insert into testavro.enrichedPageViewTopic" - + " select p.name as profileName, pv.pageKey" - + " from testavro.PAGEVIEW as pv" - + " join testavro.PROFILE.`$table` as p" - + " on p.id = pv.profileId and p.name = 'John'"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - } - - @Test (expected = SamzaException.class) - public void testTranslateStreamTableJoinWithSubQuery() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); - String sql = - "Insert into testavro.enrichedPageViewTopic" - + " select p.name as profileName, pv.pageKey" - + " from testavro.PAGEVIEW as pv" - + " where exists " - + " (select p.id from testavro.PROFILE.`$table` as p" - + " where p.id = pv.profileId)"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - } - - @Test (expected = SamzaException.class) - public void testTranslateTableTableJoin() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); - String sql = - "Insert into testavro.enrichedPageViewTopic" - + " select p.name as profileName, pv.pageKey" - + " from testavro.PAGEVIEW.`$table` as pv" - + " join testavro.PROFILE.`$table` as p" - + " on p.id = pv.profileId"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - } - - @Test (expected = SamzaException.class) - public void testTranslateStreamStreamJoin() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); - String sql = - "Insert into testavro.enrichedPageViewTopic" - + " select p.name as profileName, pv.pageKey" - + " from testavro.PAGEVIEW as pv" - + " join testavro.PROFILE as p" - + " on p.id = pv.profileId"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - } - - @Test (expected = SamzaException.class) - public void testTranslateJoinWithIncorrectLeftJoin() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); - String sql = - "Insert into testavro.enrichedPageViewTopic" - + " select p.name as profileName, pv.pageKey" - + " from testavro.PAGEVIEW.`$table` as pv" - + " left join testavro.PROFILE as p" - + " on p.id = pv.profileId"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - } - - @Test (expected = SamzaException.class) - public void testTranslateJoinWithIncorrectRightJoin() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); - String sql = - "Insert into testavro.enrichedPageViewTopic" - + " select p.name as profileName, pv.pageKey" - + " from testavro.PAGEVIEW as pv" - + " right join testavro.PROFILE.`$table` as p" - + " on p.id = pv.profileId"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - } - - @Test (expected = SamzaException.class) - public void testTranslateStreamTableInnerJoinWithMissingStream() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); - String configIOResolverDomain = - String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config"); - config.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY, - ConfigBasedIOResolverFactory.class.getName()); - String sql = - "Insert into testavro.enrichedPageViewTopic" - + " select p.name as profileName, pv.pageKey" - + " from testavro.PAGEVIEW as pv" - + " join testavro.`$table` as p" - + " on p.id = pv.profileId"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - } - - @Test (expected = SamzaException.class) - public void testTranslateStreamTableInnerJoinWithUdf() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); - String sql = - "Insert into testavro.enrichedPageViewTopic" - + " select p.name as profileName, pv.pageKey" - + " from testavro.PAGEVIEW as pv" - + " join testavro.PROFILE.`$table` as p" - + " on MyTest(p.id) = MyTest(pv.profileId)"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - } - - @Test - public void testTranslateStreamTableInnerJoin() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); - String sql = - "Insert into testavro.enrichedPageViewTopic" - + " select p.name as profileName, pv.pageKey" - + " from testavro.PAGEVIEW as pv" - + " join testavro.PROFILE.`$table` as p" - + " on p.id = pv.profileId"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - - Assert.assertEquals(2, streamGraph.getOutputStreams().size()); - Assert.assertEquals("kafka", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("sql-job-1-partition_by-stream_1", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); - Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName()); - Assert.assertEquals("enrichedPageViewTopic", streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName()); - - Assert.assertEquals(3, streamGraph.getInputOperators().size()); - Assert.assertEquals("testavro", - streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("PAGEVIEW", - streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); - Assert.assertEquals("testavro", - streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName()); - Assert.assertEquals("PROFILE", - streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName()); - Assert.assertEquals("kafka", - streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName()); - Assert.assertEquals("sql-job-1-partition_by-stream_1", - streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName()); - } - - @Test - public void testTranslateStreamTableLeftJoin() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); - String sql = - "Insert into testavro.enrichedPageViewTopic" - + " select p.name as profileName, pv.pageKey" - + " from testavro.PAGEVIEW as pv" - + " left join testavro.PROFILE.`$table` as p" - + " on p.id = pv.profileId"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - - Assert.assertEquals(2, streamGraph.getOutputStreams().size()); - Assert.assertEquals("kafka", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("sql-job-1-partition_by-stream_1", - streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); - Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName()); - Assert.assertEquals("enrichedPageViewTopic", - streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName()); - - Assert.assertEquals(3, streamGraph.getInputOperators().size()); - Assert.assertEquals("testavro", - streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("PAGEVIEW", - streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); - Assert.assertEquals("testavro", - streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName()); - Assert.assertEquals("PROFILE", - streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName()); - Assert.assertEquals("kafka", - streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName()); - Assert.assertEquals("sql-job-1-partition_by-stream_1", - streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName()); - } - - @Test - public void testTranslateStreamTableRightJoin() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); - String sql = - "Insert into testavro.enrichedPageViewTopic" - + " select p.name as profileName, pv.pageKey" - + " from testavro.PROFILE.`$table` as p" - + " right join testavro.PAGEVIEW as pv" - + " on p.id = pv.profileId"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - - Assert.assertEquals(2, streamGraph.getOutputStreams().size()); - Assert.assertEquals("kafka", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("sql-job-1-partition_by-stream_1", - streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); - Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName()); - Assert.assertEquals("enrichedPageViewTopic", - streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName()); - - Assert.assertEquals(3, streamGraph.getInputOperators().size()); - Assert.assertEquals("testavro", - streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("PROFILE", - streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); - Assert.assertEquals("testavro", - streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName()); - Assert.assertEquals("PAGEVIEW", - streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName()); - Assert.assertEquals("kafka", - streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName()); - Assert.assertEquals("sql-job-1-partition_by-stream_1", - streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName()); - } - - @Test - public void testTranslateGroupBy() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); - String sql = - "Insert into testavro.pageViewCountTopic" - + " select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`" - + " from testavro.PAGEVIEW as pv" - + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'" - + " group by (pv.pageKey)"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - - Assert.assertEquals(1, streamGraph.getInputOperators().size()); - Assert.assertEquals(1, streamGraph.getOutputStreams().size()); - Assert.assertTrue(streamGraph.hasWindowOrJoins()); - Collection<OperatorSpec> operatorSpecs = streamGraph.getAllOperatorSpecs(); - } - - @Test (expected = SamzaException.class) - public void testTranslateGroupByWithSumAggregator() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); - String sql = - "Insert into testavro.pageViewCountTopic" - + " select 'SampleJob' as jobName, pv.pageKey, sum(pv.profileId) as `sum`" - + " from testavro.PAGEVIEW as pv" + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'" - + " group by (pv.pageKey)"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); - SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); - translator.translate(queryInfo, streamGraph); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java deleted file mode 100644 index 0d48c56..0000000 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java +++ /dev/null @@ -1,95 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.samza.sql; - -import java.util.HashMap; -import java.util.Map; -import org.apache.samza.SamzaException; -import org.apache.samza.config.MapConfig; -import org.apache.samza.sql.impl.ConfigBasedUdfResolver; -import org.apache.samza.sql.interfaces.SqlIOConfig; -import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; -import org.apache.samza.sql.testutil.SamzaSqlTestConfig; -import org.junit.Assert; -import org.junit.Test; - - -public class TestSamzaSqlApplicationConfig { - - @Test - public void testConfigInit() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1"); - String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config"); - int numUdfs = config.get(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES).split(",").length; - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - Assert.assertEquals(1, samzaSqlApplicationConfig.getQueryInfo().size()); - Assert.assertEquals(numUdfs, samzaSqlApplicationConfig.getUdfMetadata().size()); - Assert.assertEquals(1, samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().size()); - Assert.assertEquals(1, samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().size()); - } - - @Test - public void testWrongConfigs() { - - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); - - - try { - // Fail because no SQL config - new SamzaSqlApplicationConfig(new MapConfig(config)); - Assert.fail(); - } catch (SamzaException e) { - } - - // Pass - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1"); - new SamzaSqlApplicationConfig(new MapConfig(config)); - testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_IO_RESOLVER); - testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_UDF_RESOLVER); - - String configIOResolverDomain = - String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config"); - String avroSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", "testavro"); - - testWithoutConfigShouldFail(config, avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER); - - // Configs for the unused system "log" is not mandatory. - String logSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", "log"); - testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER); - } - - private void testWithoutConfigShouldPass(Map<String, String> config, String configKey) { - Map<String, String> badConfigs = new HashMap<>(config); - badConfigs.remove(configKey); - new SamzaSqlApplicationConfig(new MapConfig(badConfigs)); - } - - private void testWithoutConfigShouldFail(Map<String, String> config, String configKey) { - Map<String, String> badConfigs = new HashMap<>(config); - badConfigs.remove(configKey); - try { - new SamzaSqlApplicationConfig(new MapConfig(badConfigs)); - Assert.fail(); - } catch (IllegalArgumentException e) { - // swallow - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationRunner.java deleted file mode 100644 index e42b55d..0000000 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationRunner.java +++ /dev/null @@ -1,56 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.samza.sql; - -import java.util.Map; - -import org.apache.samza.config.Config; -import org.apache.samza.config.MapConfig; -import org.apache.samza.runtime.LocalApplicationRunner; -import org.apache.samza.runtime.RemoteApplicationRunner; -import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; -import org.apache.samza.sql.testutil.SamzaSqlTestConfig; -import org.junit.Assert; - -import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; -import org.junit.Test; - - -public class TestSamzaSqlApplicationRunner { - - @Test - public void testComputeSamzaConfigs() { - Map<String, String> configs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); - String sql1 = "Insert into testavro.outputTopic select id, MyTest(id) as long_value from testavro.SIMPLE1"; - configs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql1); - configs.put(SamzaSqlApplicationRunner.RUNNER_CONFIG, SamzaSqlApplicationRunner.class.getName()); - MapConfig samzaConfig = new MapConfig(configs); - Config newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(true, samzaConfig); - Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), LocalApplicationRunner.class.getName()); - // Check whether three new configs added. - Assert.assertEquals(newConfigs.size(), configs.size() + 3); - - newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(false, samzaConfig); - Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), RemoteApplicationRunner.class.getName()); - - // Check whether three new configs added. - Assert.assertEquals(newConfigs.size(), configs.size() + 3); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlFileParser.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlFileParser.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlFileParser.java deleted file mode 100644 index 5bac472..0000000 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlFileParser.java +++ /dev/null @@ -1,58 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.samza.sql; - -import java.io.File; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.List; - -import org.apache.samza.sql.testutil.SqlFileParser; -import org.junit.Assert; -import org.junit.Test; - - -public class TestSamzaSqlFileParser { - - public static final String TEST_SQL = - "insert into log.outputStream \n" + "\tselect * from brooklin.elasticsearchEnterpriseAccounts\n" - + "insert into log.outputstream select sfdcAccountId as key, organizationUrn as name2, " - + "description as name3 from brooklin.elasticsearchEnterpriseAccounts\n" + "--insert into log.outputstream \n" - + "insert into log.outputstream \n" + "\n" + "\tselect id, MyTest(id) as id2 \n" + "\n" - + "\tfrom tracking.SamzaSqlTestTopic1_p8"; - - @Test - public void testParseSqlFile() throws IOException { - File tempFile = File.createTempFile("testparser", ""); - PrintWriter fileWriter = new PrintWriter(tempFile.getCanonicalPath()); - fileWriter.println(TEST_SQL); - fileWriter.close(); - - List<String> sqlStmts = SqlFileParser.parseSqlFile(tempFile.getAbsolutePath()); - Assert.assertEquals(3, sqlStmts.size()); - Assert.assertEquals("insert into log.outputStream select * from brooklin.elasticsearchEnterpriseAccounts", - sqlStmts.get(0)); - Assert.assertEquals( - "insert into log.outputstream select sfdcAccountId as key, organizationUrn as name2, description as name3 from brooklin.elasticsearchEnterpriseAccounts", - sqlStmts.get(1)); - Assert.assertEquals("insert into log.outputstream select id, MyTest(id) as id2 from tracking.SamzaSqlTestTopic1_p8", - sqlStmts.get(2)); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java deleted file mode 100644 index 24faf4b..0000000 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java +++ /dev/null @@ -1,76 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.samza.sql; - -import org.apache.samza.SamzaException; -import org.apache.samza.sql.testutil.SamzaSqlQueryParser; -import org.apache.samza.sql.testutil.SamzaSqlQueryParser.QueryInfo; -import org.junit.Test; - -import junit.framework.Assert; - -public class TestSamzaSqlQueryParser { - - @Test - public void testParseQuery() { - QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo select * from tracking.bar"); - Assert.assertEquals("log.foo", queryInfo.getSink()); - Assert.assertEquals(queryInfo.getSelectQuery(), "select * from tracking.bar", queryInfo.getSelectQuery()); - Assert.assertEquals(1, queryInfo.getSources().size()); - Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0)); - } - - @Test - public void testParseJoinQuery() { - String sql = - "Insert into testavro.enrichedPageViewTopic" - + " select p.name as profileName, pv.pageKey" - + " from testavro.PAGEVIEW as pv" - + " join testavro.PROFILE.`$table` as p" - + " on p.id = pv.profileId"; - QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql); - Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getSink()); - Assert.assertEquals(2, queryInfo.getSources().size()); - Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getSources().get(0)); - Assert.assertEquals("testavro.PROFILE.$table", queryInfo.getSources().get(1)); - } - - @Test - public void testParseInvalidQuery() { - - try { - SamzaSqlQueryParser.parseQuery("select * from tracking.bar"); - Assert.fail("Expected a samzaException"); - } catch (SamzaException e) { - } - - try { - SamzaSqlQueryParser.parseQuery("insert into select * from tracking.bar"); - Assert.fail("Expected a samzaException"); - } catch (SamzaException e) { - } - - try { - SamzaSqlQueryParser.parseQuery("insert into log.off select from tracking.bar"); - Assert.fail("Expected a samzaException"); - } catch (SamzaException e) { - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java deleted file mode 100644 index 689af72..0000000 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessage.java +++ /dev/null @@ -1,46 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.samza.sql; - -import java.util.Arrays; -import java.util.List; -import org.apache.samza.sql.data.SamzaSqlRelMessage; -import org.junit.Assert; -import org.junit.Test; - - -public class TestSamzaSqlRelMessage { - - private List<Object> values = Arrays.asList("value1", "value2"); - private List<String> names = Arrays.asList("field1", "field2"); - - @Test - public void testGetField() { - SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values); - Assert.assertEquals(values.get(0), message.getSamzaSqlRelRecord().getField(names.get(0)).get()); - Assert.assertEquals(values.get(1), message.getSamzaSqlRelRecord().getField(names.get(1)).get()); - } - - @Test - public void testGetNonExistentField() { - SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values); - Assert.assertFalse(message.getSamzaSqlRelRecord().getField("field3").isPresent()); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java deleted file mode 100644 index 90fce3b..0000000 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java +++ /dev/null @@ -1,119 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.samza.sql; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; -import org.apache.calcite.rel.core.JoinRelType; -import org.apache.samza.operators.KV; -import org.apache.samza.sql.data.SamzaSqlCompositeKey; -import org.apache.samza.sql.data.SamzaSqlRelMessage; -import org.apache.samza.sql.translator.SamzaSqlRelMessageJoinFunction; -import org.junit.Assert; -import org.junit.Test; - - -public class TestSamzaSqlRelMessageJoinFunction { - - private List<String> streamFieldNames = Arrays.asList("field1", "field2", "field3", "field4"); - private List<Object> streamFieldValues = Arrays.asList("value1", 1, null, "value4"); - private List<String> tableFieldNames = Arrays.asList("field11", "field12", "field13", "field14"); - private List<Object> tableFieldValues = Arrays.asList("value1", 1, null, "value5"); - - @Test - public void testWithInnerJoinWithTableOnRight() { - SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues); - SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues); - JoinRelType joinRelType = JoinRelType.INNER; - List<Integer> streamKeyIds = Arrays.asList(0, 1); - List<Integer> tableKeyIds = Arrays.asList(0, 1); - SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds); - KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg); - - SamzaSqlRelMessageJoinFunction joinFn = - new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames); - SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record); - - Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(), - outMsg.getSamzaSqlRelRecord().getFieldNames().size()); - List<String> expectedFieldNames = new ArrayList<>(streamFieldNames); - expectedFieldNames.addAll(tableFieldNames); - List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues); - expectedFieldValues.addAll(tableFieldValues); - Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues); - } - - @Test - public void testWithInnerJoinWithTableOnLeft() { - SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues); - SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues); - JoinRelType joinRelType = JoinRelType.INNER; - List<Integer> streamKeyIds = Arrays.asList(0, 2); - List<Integer> tableKeyIds = Arrays.asList(0, 2); - SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds); - KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg); - - SamzaSqlRelMessageJoinFunction joinFn = - new SamzaSqlRelMessageJoinFunction(joinRelType, false, streamKeyIds, streamFieldNames, tableFieldNames); - SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record); - - Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(), - outMsg.getSamzaSqlRelRecord().getFieldNames().size()); - List<String> expectedFieldNames = new ArrayList<>(tableFieldNames); - expectedFieldNames.addAll(streamFieldNames); - List<Object> expectedFieldValues = new ArrayList<>(tableFieldValues); - expectedFieldValues.addAll(streamFieldValues); - Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues); - } - - @Test - public void testNullRecordWithInnerJoin() { - SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues); - JoinRelType joinRelType = JoinRelType.INNER; - List<Integer> streamKeyIds = Arrays.asList(0, 1); - - SamzaSqlRelMessageJoinFunction joinFn = - new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames); - SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null); - Assert.assertNull(outMsg); - } - - @Test - public void testNullRecordWithLeftOuterJoin() { - SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues); - JoinRelType joinRelType = JoinRelType.LEFT; - List<Integer> streamKeyIds = Arrays.asList(0, 1); - - SamzaSqlRelMessageJoinFunction joinFn = - new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, - tableFieldNames); - SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null); - - Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(), - outMsg.getSamzaSqlRelRecord().getFieldNames().size()); - List<String> expectedFieldNames = new ArrayList<>(streamFieldNames); - expectedFieldNames.addAll(tableFieldNames); - List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues); - expectedFieldValues.addAll(tableFieldNames.stream().map( name -> null ).collect(Collectors.toList())); - Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues(), expectedFieldValues); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java b/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java new file mode 100644 index 0000000..93e6223 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java @@ -0,0 +1,46 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.data; + +import java.util.Arrays; +import java.util.List; +import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.junit.Assert; +import org.junit.Test; + + +public class TestSamzaSqlRelMessage { + + private List<Object> values = Arrays.asList("value1", "value2"); + private List<String> names = Arrays.asList("field1", "field2"); + + @Test + public void testGetField() { + SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values); + Assert.assertEquals(values.get(0), message.getSamzaSqlRelRecord().getField(names.get(0)).get()); + Assert.assertEquals(values.get(1), message.getSamzaSqlRelRecord().getField(names.get(1)).get()); + } + + @Test + public void testGetNonExistentField() { + SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values); + Assert.assertFalse(message.getSamzaSqlRelRecord().getField("field3").isPresent()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java new file mode 100644 index 0000000..dac5d02 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java @@ -0,0 +1,95 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.runner; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.SamzaException; +import org.apache.samza.config.MapConfig; +import org.apache.samza.sql.impl.ConfigBasedUdfResolver; +import org.apache.samza.sql.interfaces.SqlIOConfig; +import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; +import org.apache.samza.sql.testutil.SamzaSqlTestConfig; +import org.junit.Assert; +import org.junit.Test; + + +public class TestSamzaSqlApplicationConfig { + + @Test + public void testConfigInit() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1"); + String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config"); + int numUdfs = config.get(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES).split(",").length; + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + Assert.assertEquals(1, samzaSqlApplicationConfig.getQueryInfo().size()); + Assert.assertEquals(numUdfs, samzaSqlApplicationConfig.getUdfMetadata().size()); + Assert.assertEquals(1, samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().size()); + Assert.assertEquals(1, samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().size()); + } + + @Test + public void testWrongConfigs() { + + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); + + + try { + // Fail because no SQL config + new SamzaSqlApplicationConfig(new MapConfig(config)); + Assert.fail(); + } catch (SamzaException e) { + } + + // Pass + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1"); + new SamzaSqlApplicationConfig(new MapConfig(config)); + testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_IO_RESOLVER); + testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_UDF_RESOLVER); + + String configIOResolverDomain = + String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config"); + String avroSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", "testavro"); + + testWithoutConfigShouldFail(config, avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER); + + // Configs for the unused system "log" is not mandatory. + String logSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", "log"); + testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER); + } + + private void testWithoutConfigShouldPass(Map<String, String> config, String configKey) { + Map<String, String> badConfigs = new HashMap<>(config); + badConfigs.remove(configKey); + new SamzaSqlApplicationConfig(new MapConfig(badConfigs)); + } + + private void testWithoutConfigShouldFail(Map<String, String> config, String configKey) { + Map<String, String> badConfigs = new HashMap<>(config); + badConfigs.remove(configKey); + try { + new SamzaSqlApplicationConfig(new MapConfig(badConfigs)); + Assert.fail(); + } catch (IllegalArgumentException e) { + // swallow + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java new file mode 100644 index 0000000..b6dcac5 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java @@ -0,0 +1,56 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.runner; + +import java.util.Map; + +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.runtime.RemoteApplicationRunner; +import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; +import org.apache.samza.sql.testutil.SamzaSqlTestConfig; +import org.junit.Assert; + +import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; +import org.junit.Test; + + +public class TestSamzaSqlApplicationRunner { + + @Test + public void testComputeSamzaConfigs() { + Map<String, String> configs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); + String sql1 = "Insert into testavro.outputTopic select id, MyTest(id) as long_value from testavro.SIMPLE1"; + configs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql1); + configs.put(SamzaSqlApplicationRunner.RUNNER_CONFIG, SamzaSqlApplicationRunner.class.getName()); + MapConfig samzaConfig = new MapConfig(configs); + Config newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(true, samzaConfig); + Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), LocalApplicationRunner.class.getName()); + // Check whether three new configs added. + Assert.assertEquals(newConfigs.size(), configs.size() + 3); + + newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(false, samzaConfig); + Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), RemoteApplicationRunner.class.getName()); + + // Check whether three new configs added. + Assert.assertEquals(newConfigs.size(), configs.size() + 3); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java new file mode 100644 index 0000000..a84f347 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java @@ -0,0 +1,58 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.testutil; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.List; + +import org.apache.samza.sql.testutil.SqlFileParser; +import org.junit.Assert; +import org.junit.Test; + + +public class TestSamzaSqlFileParser { + + public static final String TEST_SQL = + "insert into log.outputStream \n" + "\tselect * from brooklin.elasticsearchEnterpriseAccounts\n" + + "insert into log.outputstream select sfdcAccountId as key, organizationUrn as name2, " + + "description as name3 from brooklin.elasticsearchEnterpriseAccounts\n" + "--insert into log.outputstream \n" + + "insert into log.outputstream \n" + "\n" + "\tselect id, MyTest(id) as id2 \n" + "\n" + + "\tfrom tracking.SamzaSqlTestTopic1_p8"; + + @Test + public void testParseSqlFile() throws IOException { + File tempFile = File.createTempFile("testparser", ""); + PrintWriter fileWriter = new PrintWriter(tempFile.getCanonicalPath()); + fileWriter.println(TEST_SQL); + fileWriter.close(); + + List<String> sqlStmts = SqlFileParser.parseSqlFile(tempFile.getAbsolutePath()); + Assert.assertEquals(3, sqlStmts.size()); + Assert.assertEquals("insert into log.outputStream select * from brooklin.elasticsearchEnterpriseAccounts", + sqlStmts.get(0)); + Assert.assertEquals( + "insert into log.outputstream select sfdcAccountId as key, organizationUrn as name2, description as name3 from brooklin.elasticsearchEnterpriseAccounts", + sqlStmts.get(1)); + Assert.assertEquals("insert into log.outputstream select id, MyTest(id) as id2 from tracking.SamzaSqlTestTopic1_p8", + sqlStmts.get(2)); + } +}
