Repository: samza Updated Branches: refs/heads/samza-sql 6743df319 -> 6a40d5a9a
http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java b/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java new file mode 100644 index 0000000..b4b0e59 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task.sql; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.samza.sql.api.data.EntityName; +import org.apache.samza.sql.api.data.Relation; +import org.apache.samza.sql.api.data.Tuple; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.OutgoingMessageEnvelope; + + +/** + * Example implementation of <code>SqlMessageCollector</code> that stores outputs from the operators + * + */ +public class StoreMessageCollector implements SqlMessageCollector { + + private final KeyValueStore<EntityName, List<Object>> outputStore; + + public StoreMessageCollector(KeyValueStore<EntityName, List<Object>> store) { + this.outputStore = store; + } + + @Override + public void send(Relation deltaRelation) throws Exception { + saveOutput(deltaRelation.getName(), deltaRelation); + } + + @Override + public void send(Tuple tuple) throws Exception { + saveOutput(tuple.getStreamName(), tuple); + } + + @Override + public void timeout(List<EntityName> outputs) throws Exception { + // TODO Auto-generated method stub + } + + public List<Object> removeOutput(EntityName id) { + List<Object> output = outputStore.get(id); + outputStore.delete(id); + return output; + } + + private void saveOutput(EntityName name, Object output) { + if (this.outputStore.get(name) == null) { + this.outputStore.put(name, new ArrayList<Object>()); + } + List<Object> outputs = this.outputStore.get(name); + outputs.add(output); + } + + @Override + public void send(OutgoingMessageEnvelope envelope) { + saveOutput( + EntityName.getStreamName(envelope.getSystemStream().getSystem() + ":" + envelope.getSystemStream().getStream()), + envelope); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java b/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java new file mode 100644 index 0000000..4ec7dbb --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task.sql; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.samza.config.Config; +import org.apache.samza.sql.api.data.EntityName; +import org.apache.samza.sql.api.data.Relation; +import org.apache.samza.sql.api.data.Tuple; +import org.apache.samza.sql.data.IncomingMessageTuple; +import org.apache.samza.sql.operators.relation.Join; +import org.apache.samza.sql.operators.window.BoundedTimeWindow; +import org.apache.samza.storage.kv.KeyValueIterator; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.InitableTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.task.WindowableTask; + + +/*** + * This example illustrate a SQL join operation that joins two streams together using the following operations: + * <p>a. the two streams are each processed by a window operator to convert to relations + * <p>b. a join operator is applied on the two relations to generate join results + * <p>c. finally, the join results are sent out to the system output + * + */ +public class RandomOperatorTask implements StreamTask, InitableTask, WindowableTask { + private KeyValueStore<EntityName, List<Object>> opOutputStore; + private BoundedTimeWindow wndOp1; + private BoundedTimeWindow wndOp2; + private Join joinOp; + + private BoundedTimeWindow getWindowOp(EntityName streamName) { + if (streamName.equals(EntityName.getStreamName("kafka:stream1"))) { + return this.wndOp1; + } else if (streamName.equals(EntityName.getStreamName("kafka:stream2"))) { + return this.wndOp2; + } + + throw new IllegalArgumentException("No window operator found for stream: " + streamName); + } + + private void processJoinOutput(List<Object> outputs, MessageCollector collector) { + // get each tuple in the join operator's outputs and send it to system stream + for (Object joinOutput : outputs) { + for (KeyValueIterator<Object, Tuple> iter = ((Relation) joinOutput).all(); iter.hasNext();) { + Tuple otuple = iter.next().getValue(); + collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "joinOutput1"), otuple.getKey(), otuple + .getMessage())); + } + } + } + + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) + throws Exception { + // create the StoreMessageCollector + StoreMessageCollector sqlCollector = new StoreMessageCollector(this.opOutputStore); + + // construct the input tuple + IncomingMessageTuple ituple = new IncomingMessageTuple(envelope); + + // based on tuple's stream name, get the window op and run process() + BoundedTimeWindow wndOp = getWindowOp(ituple.getStreamName()); + wndOp.process(ituple, sqlCollector); + List<Object> wndOutputs = sqlCollector.removeOutput(wndOp.getSpec().getOutputNames().get(0)); + if (wndOutputs.isEmpty()) { + return; + } + + // process all output from the window operator + for (Object input : wndOutputs) { + Relation relation = (Relation) input; + this.joinOp.process(relation, sqlCollector); + } + // get the output from the join operator and send them + processJoinOutput(sqlCollector.removeOutput(this.joinOp.getSpec().getOutputNames().get(0)), collector); + + } + + @Override + public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { + // create the StoreMessageCollector + StoreMessageCollector sqlCollector = new StoreMessageCollector(this.opOutputStore); + + // trigger timeout event on both window operators + this.wndOp1.window(sqlCollector, coordinator); + this.wndOp2.window(sqlCollector, coordinator); + + // for all outputs from the window operators, call joinOp.process() + for (Object input : sqlCollector.removeOutput(this.wndOp1.getSpec().getOutputNames().get(0))) { + Relation relation = (Relation) input; + this.joinOp.process(relation, sqlCollector); + } + for (Object input : sqlCollector.removeOutput(this.wndOp2.getSpec().getOutputNames().get(0))) { + Relation relation = (Relation) input; + this.joinOp.process(relation, sqlCollector); + } + + // get the output from the join operator and send them + processJoinOutput(sqlCollector.removeOutput(this.joinOp.getSpec().getOutputNames().get(0)), collector); + } + + @SuppressWarnings("unchecked") + @Override + public void init(Config config, TaskContext context) throws Exception { + // 1. create a fixed length 10 sec window operator + this.wndOp1 = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1", "relation1"); + this.wndOp2 = new BoundedTimeWindow("wndOp2", 10, "kafka:stream2", "relation2"); + // 2. create a join operation + List<String> inputRelations = new ArrayList<String>(); + inputRelations.add("relation1"); + inputRelations.add("relation2"); + List<String> joinKeys = new ArrayList<String>(); + joinKeys.add("key1"); + joinKeys.add("key2"); + this.joinOp = new Join("joinOp", inputRelations, "joinOutput", joinKeys); + // Finally, initialize all operators + this.opOutputStore = + (KeyValueStore<EntityName, List<Object>>) context.getStore("samza-sql-operator-output-kvstore"); + this.wndOp1.init(config, context); + this.wndOp2.init(config, context); + this.joinOp.init(config, context); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java b/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java new file mode 100644 index 0000000..4796fa6 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task.sql; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.samza.config.Config; +import org.apache.samza.sql.api.data.EntityName; +import org.apache.samza.sql.api.operators.Operator; +import org.apache.samza.sql.api.operators.TupleOperator; +import org.apache.samza.sql.api.router.OperatorRouter; +import org.apache.samza.sql.data.IncomingMessageTuple; +import org.apache.samza.sql.operators.factory.SimpleOperatorFactoryImpl; +import org.apache.samza.sql.operators.partition.PartitionOp; +import org.apache.samza.sql.operators.partition.PartitionSpec; +import org.apache.samza.sql.operators.relation.Join; +import org.apache.samza.sql.operators.relation.JoinSpec; +import org.apache.samza.sql.operators.stream.InsertStream; +import org.apache.samza.sql.operators.stream.InsertStreamSpec; +import org.apache.samza.sql.operators.window.BoundedTimeWindow; +import org.apache.samza.sql.operators.window.WindowSpec; +import org.apache.samza.sql.router.SimpleRouter; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.InitableTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.task.WindowableTask; + + +/*** + * This example illustrate a SQL join operation that joins two streams together using the folowing operations: + * <ul> + * <li>a. the two streams are each processed by a window operator to convert to relations + * <li>b. a join operator is applied on the two relations to generate join results + * <li>c. an istream operator is applied on join output and convert the relation into a stream + * <li>d. a partition operator that re-partitions the output stream from istream and send the stream to system output + * </ul> + * + * This example also uses an implementation of <code>SqlMessageCollector</code> (@see <code>OperatorMessageCollector</code>) + * that uses <code>OperatorRouter</code> to automatically execute the whole paths that connects operators together. + */ +public class StreamSqlTask implements StreamTask, InitableTask, WindowableTask { + + private OperatorRouter rteCntx; + + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) + throws Exception { + SqlMessageCollector opCollector = new OperatorMessageCollector(collector, coordinator, this.rteCntx); + + IncomingMessageTuple ituple = new IncomingMessageTuple(envelope); + for (Iterator<TupleOperator> iter = this.rteCntx.getTupleOperators(ituple.getStreamName()).iterator(); iter + .hasNext();) { + iter.next().process(ituple, opCollector); + } + + } + + @Override + public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { + SqlMessageCollector opCollector = new OperatorMessageCollector(collector, coordinator, this.rteCntx); + + for (EntityName entity : this.rteCntx.getSystemInputs()) { + for (Iterator<Operator> iter = this.rteCntx.getNextOperators(entity).iterator(); iter.hasNext();) { + iter.next().window(opCollector, coordinator); + } + } + + } + + @Override + public void init(Config config, TaskContext context) throws Exception { + // create specification of all operators first + // 1. create 2 window specifications that define 2 windows of fixed length of 10 seconds + final WindowSpec spec1 = + new WindowSpec("fixedWnd1", EntityName.getStreamName("inputStream1"), + EntityName.getRelationName("fixedWndOutput1"), 10); + final WindowSpec spec2 = + new WindowSpec("fixedWnd2", EntityName.getStreamName("inputStream2"), + EntityName.getRelationName("fixedWndOutput2"), 10); + // 2. create a join specification that join the output from 2 window operators together + @SuppressWarnings("serial") + List<EntityName> inputRelations = new ArrayList<EntityName>() { + { + add(spec1.getOutputName()); + add(spec2.getOutputName()); + } + }; + @SuppressWarnings("serial") + List<String> joinKeys = new ArrayList<String>() { + { + add("key1"); + add("key2"); + } + }; + JoinSpec joinSpec = new JoinSpec("joinOp", inputRelations, EntityName.getRelationName("joinOutput"), joinKeys); + // 3. create the specification of an istream operator that convert the output from join to a stream + InsertStreamSpec istrmSpec = + new InsertStreamSpec("istremOp", joinSpec.getOutputName(), EntityName.getStreamName("istrmOutput1")); + // 4. create the specification of a partition operator that re-partitions the stream based on <code>joinKey</code> + PartitionSpec parSpec = + new PartitionSpec("parOp1", istrmSpec.getOutputName().getName(), new SystemStream("kafka", "parOutputStrm1"), + "joinKey", 50); + + // create all operators via the operator factory + // 1. create two window operators + SimpleOperatorFactoryImpl operatorFactory = new SimpleOperatorFactoryImpl(); + BoundedTimeWindow wnd1 = (BoundedTimeWindow) operatorFactory.getTupleOperator(spec1); + BoundedTimeWindow wnd2 = (BoundedTimeWindow) operatorFactory.getTupleOperator(spec2); + // 2. create one join operator + Join join = (Join) operatorFactory.getRelationOperator(joinSpec); + // 3. create one stream operator + InsertStream istream = (InsertStream) operatorFactory.getRelationOperator(istrmSpec); + // 4. create a re-partition operator + PartitionOp par = (PartitionOp) operatorFactory.getTupleOperator(parSpec); + + // Now, connecting the operators via the OperatorRouter + this.rteCntx = new SimpleRouter(); + // 1. set two system input operators (i.e. two window operators) + this.rteCntx.addTupleOperator(spec1.getInputName(), wnd1); + this.rteCntx.addTupleOperator(spec2.getInputName(), wnd2); + // 2. connect join operator to both window operators + this.rteCntx.addRelationOperator(spec1.getOutputName(), join); + this.rteCntx.addRelationOperator(spec2.getOutputName(), join); + // 3. connect stream operator to the join operator + this.rteCntx.addRelationOperator(joinSpec.getOutputName(), istream); + // 4. connect re-partition operator to the stream operator + this.rteCntx.addTupleOperator(istrmSpec.getOutputName(), par); + // 5. set the system inputs + this.rteCntx.addSystemInput(spec1.getInputName()); + this.rteCntx.addSystemInput(spec2.getInputName()); + + for (Iterator<Operator> iter = this.rteCntx.iterator(); iter.hasNext();) { + iter.next().init(config, context); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index bb07a3b..08e548c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -26,7 +26,8 @@ include \ 'samza-log4j', 'samza-shell', 'samza-yarn', - 'samza-test' + 'samza-test', + 'samza-sql' rootProject.children.each { if (it.name != 'samza-api' && it.name != 'samza-shell' && it.name != 'samza-log4j') {
