Repository: samza
Updated Branches:
  refs/heads/samza-sql 6743df319 -> 6a40d5a9a


http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java 
b/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
new file mode 100644
index 0000000..b4b0e59
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task.sql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+
+
+/**
+ * Example implementation of <code>SqlMessageCollector</code> that stores 
outputs from the operators
+ *
+ */
+public class StoreMessageCollector implements SqlMessageCollector {
+
+  private final KeyValueStore<EntityName, List<Object>> outputStore;
+
+  public StoreMessageCollector(KeyValueStore<EntityName, List<Object>> store) {
+    this.outputStore = store;
+  }
+
+  @Override
+  public void send(Relation deltaRelation) throws Exception {
+    saveOutput(deltaRelation.getName(), deltaRelation);
+  }
+
+  @Override
+  public void send(Tuple tuple) throws Exception {
+    saveOutput(tuple.getStreamName(), tuple);
+  }
+
+  @Override
+  public void timeout(List<EntityName> outputs) throws Exception {
+    // TODO Auto-generated method stub
+  }
+
+  public List<Object> removeOutput(EntityName id) {
+    List<Object> output = outputStore.get(id);
+    outputStore.delete(id);
+    return output;
+  }
+
+  private void saveOutput(EntityName name, Object output) {
+    if (this.outputStore.get(name) == null) {
+      this.outputStore.put(name, new ArrayList<Object>());
+    }
+    List<Object> outputs = this.outputStore.get(name);
+    outputs.add(output);
+  }
+
+  @Override
+  public void send(OutgoingMessageEnvelope envelope) {
+    saveOutput(
+        EntityName.getStreamName(envelope.getSystemStream().getSystem() + ":" 
+ envelope.getSystemStream().getStream()),
+        envelope);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java 
b/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java
new file mode 100644
index 0000000..4ec7dbb
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task.sql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.data.IncomingMessageTuple;
+import org.apache.samza.sql.operators.relation.Join;
+import org.apache.samza.sql.operators.window.BoundedTimeWindow;
+import org.apache.samza.storage.kv.KeyValueIterator;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.WindowableTask;
+
+
+/***
+ * This example illustrate a SQL join operation that joins two streams 
together using the following operations:
+ * <p>a. the two streams are each processed by a window operator to convert to 
relations
+ * <p>b. a join operator is applied on the two relations to generate join 
results
+ * <p>c. finally, the join results are sent out to the system output
+ *
+ */
+public class RandomOperatorTask implements StreamTask, InitableTask, 
WindowableTask {
+  private KeyValueStore<EntityName, List<Object>> opOutputStore;
+  private BoundedTimeWindow wndOp1;
+  private BoundedTimeWindow wndOp2;
+  private Join joinOp;
+
+  private BoundedTimeWindow getWindowOp(EntityName streamName) {
+    if (streamName.equals(EntityName.getStreamName("kafka:stream1"))) {
+      return this.wndOp1;
+    } else if (streamName.equals(EntityName.getStreamName("kafka:stream2"))) {
+      return this.wndOp2;
+    }
+
+    throw new IllegalArgumentException("No window operator found for stream: " 
+ streamName);
+  }
+
+  private void processJoinOutput(List<Object> outputs, MessageCollector 
collector) {
+    // get each tuple in the join operator's outputs and send it to system 
stream
+    for (Object joinOutput : outputs) {
+      for (KeyValueIterator<Object, Tuple> iter = ((Relation) 
joinOutput).all(); iter.hasNext();) {
+        Tuple otuple = iter.next().getValue();
+        collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", 
"joinOutput1"), otuple.getKey(), otuple
+            .getMessage()));
+      }
+    }
+  }
+
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator)
+      throws Exception {
+    // create the StoreMessageCollector
+    StoreMessageCollector sqlCollector = new 
StoreMessageCollector(this.opOutputStore);
+
+    // construct the input tuple
+    IncomingMessageTuple ituple = new IncomingMessageTuple(envelope);
+
+    // based on tuple's stream name, get the window op and run process()
+    BoundedTimeWindow wndOp = getWindowOp(ituple.getStreamName());
+    wndOp.process(ituple, sqlCollector);
+    List<Object> wndOutputs = 
sqlCollector.removeOutput(wndOp.getSpec().getOutputNames().get(0));
+    if (wndOutputs.isEmpty()) {
+      return;
+    }
+
+    // process all output from the window operator
+    for (Object input : wndOutputs) {
+      Relation relation = (Relation) input;
+      this.joinOp.process(relation, sqlCollector);
+    }
+    // get the output from the join operator and send them
+    
processJoinOutput(sqlCollector.removeOutput(this.joinOp.getSpec().getOutputNames().get(0)),
 collector);
+
+  }
+
+  @Override
+  public void window(MessageCollector collector, TaskCoordinator coordinator) 
throws Exception {
+    // create the StoreMessageCollector
+    StoreMessageCollector sqlCollector = new 
StoreMessageCollector(this.opOutputStore);
+
+    // trigger timeout event on both window operators
+    this.wndOp1.window(sqlCollector, coordinator);
+    this.wndOp2.window(sqlCollector, coordinator);
+
+    // for all outputs from the window operators, call joinOp.process()
+    for (Object input : 
sqlCollector.removeOutput(this.wndOp1.getSpec().getOutputNames().get(0))) {
+      Relation relation = (Relation) input;
+      this.joinOp.process(relation, sqlCollector);
+    }
+    for (Object input : 
sqlCollector.removeOutput(this.wndOp2.getSpec().getOutputNames().get(0))) {
+      Relation relation = (Relation) input;
+      this.joinOp.process(relation, sqlCollector);
+    }
+
+    // get the output from the join operator and send them
+    
processJoinOutput(sqlCollector.removeOutput(this.joinOp.getSpec().getOutputNames().get(0)),
 collector);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    // 1. create a fixed length 10 sec window operator
+    this.wndOp1 = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1", 
"relation1");
+    this.wndOp2 = new BoundedTimeWindow("wndOp2", 10, "kafka:stream2", 
"relation2");
+    // 2. create a join operation
+    List<String> inputRelations = new ArrayList<String>();
+    inputRelations.add("relation1");
+    inputRelations.add("relation2");
+    List<String> joinKeys = new ArrayList<String>();
+    joinKeys.add("key1");
+    joinKeys.add("key2");
+    this.joinOp = new Join("joinOp", inputRelations, "joinOutput", joinKeys);
+    // Finally, initialize all operators
+    this.opOutputStore =
+        (KeyValueStore<EntityName, List<Object>>) 
context.getStore("samza-sql-operator-output-kvstore");
+    this.wndOp1.init(config, context);
+    this.wndOp2.init(config, context);
+    this.joinOp.init(config, context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 
b/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
new file mode 100644
index 0000000..4796fa6
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task.sql;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.operators.Operator;
+import org.apache.samza.sql.api.operators.TupleOperator;
+import org.apache.samza.sql.api.router.OperatorRouter;
+import org.apache.samza.sql.data.IncomingMessageTuple;
+import org.apache.samza.sql.operators.factory.SimpleOperatorFactoryImpl;
+import org.apache.samza.sql.operators.partition.PartitionOp;
+import org.apache.samza.sql.operators.partition.PartitionSpec;
+import org.apache.samza.sql.operators.relation.Join;
+import org.apache.samza.sql.operators.relation.JoinSpec;
+import org.apache.samza.sql.operators.stream.InsertStream;
+import org.apache.samza.sql.operators.stream.InsertStreamSpec;
+import org.apache.samza.sql.operators.window.BoundedTimeWindow;
+import org.apache.samza.sql.operators.window.WindowSpec;
+import org.apache.samza.sql.router.SimpleRouter;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.WindowableTask;
+
+
+/***
+ * This example illustrate a SQL join operation that joins two streams 
together using the folowing operations:
+ * <ul>
+ * <li>a. the two streams are each processed by a window operator to convert 
to relations
+ * <li>b. a join operator is applied on the two relations to generate join 
results
+ * <li>c. an istream operator is applied on join output and convert the 
relation into a stream
+ * <li>d. a partition operator that re-partitions the output stream from 
istream and send the stream to system output
+ * </ul>
+ *
+ * This example also uses an implementation of 
<code>SqlMessageCollector</code> (@see <code>OperatorMessageCollector</code>)
+ * that uses <code>OperatorRouter</code> to automatically execute the whole 
paths that connects operators together.
+ */
+public class StreamSqlTask implements StreamTask, InitableTask, WindowableTask 
{
+
+  private OperatorRouter rteCntx;
+
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator)
+      throws Exception {
+    SqlMessageCollector opCollector = new OperatorMessageCollector(collector, 
coordinator, this.rteCntx);
+
+    IncomingMessageTuple ituple = new IncomingMessageTuple(envelope);
+    for (Iterator<TupleOperator> iter = 
this.rteCntx.getTupleOperators(ituple.getStreamName()).iterator(); iter
+        .hasNext();) {
+      iter.next().process(ituple, opCollector);
+    }
+
+  }
+
+  @Override
+  public void window(MessageCollector collector, TaskCoordinator coordinator) 
throws Exception {
+    SqlMessageCollector opCollector = new OperatorMessageCollector(collector, 
coordinator, this.rteCntx);
+
+    for (EntityName entity : this.rteCntx.getSystemInputs()) {
+      for (Iterator<Operator> iter = 
this.rteCntx.getNextOperators(entity).iterator(); iter.hasNext();) {
+        iter.next().window(opCollector, coordinator);
+      }
+    }
+
+  }
+
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    // create specification of all operators first
+    // 1. create 2 window specifications that define 2 windows of fixed length 
of 10 seconds
+    final WindowSpec spec1 =
+        new WindowSpec("fixedWnd1", EntityName.getStreamName("inputStream1"),
+            EntityName.getRelationName("fixedWndOutput1"), 10);
+    final WindowSpec spec2 =
+        new WindowSpec("fixedWnd2", EntityName.getStreamName("inputStream2"),
+            EntityName.getRelationName("fixedWndOutput2"), 10);
+    // 2. create a join specification that join the output from 2 window 
operators together
+    @SuppressWarnings("serial")
+    List<EntityName> inputRelations = new ArrayList<EntityName>() {
+      {
+        add(spec1.getOutputName());
+        add(spec2.getOutputName());
+      }
+    };
+    @SuppressWarnings("serial")
+    List<String> joinKeys = new ArrayList<String>() {
+      {
+        add("key1");
+        add("key2");
+      }
+    };
+    JoinSpec joinSpec = new JoinSpec("joinOp", inputRelations, 
EntityName.getRelationName("joinOutput"), joinKeys);
+    // 3. create the specification of an istream operator that convert the 
output from join to a stream
+    InsertStreamSpec istrmSpec =
+        new InsertStreamSpec("istremOp", joinSpec.getOutputName(), 
EntityName.getStreamName("istrmOutput1"));
+    // 4. create the specification of a partition operator that re-partitions 
the stream based on <code>joinKey</code>
+    PartitionSpec parSpec =
+        new PartitionSpec("parOp1", istrmSpec.getOutputName().getName(), new 
SystemStream("kafka", "parOutputStrm1"),
+            "joinKey", 50);
+
+    // create all operators via the operator factory
+    // 1. create two window operators
+    SimpleOperatorFactoryImpl operatorFactory = new 
SimpleOperatorFactoryImpl();
+    BoundedTimeWindow wnd1 = (BoundedTimeWindow) 
operatorFactory.getTupleOperator(spec1);
+    BoundedTimeWindow wnd2 = (BoundedTimeWindow) 
operatorFactory.getTupleOperator(spec2);
+    // 2. create one join operator
+    Join join = (Join) operatorFactory.getRelationOperator(joinSpec);
+    // 3. create one stream operator
+    InsertStream istream = (InsertStream) 
operatorFactory.getRelationOperator(istrmSpec);
+    // 4. create a re-partition operator
+    PartitionOp par = (PartitionOp) operatorFactory.getTupleOperator(parSpec);
+
+    // Now, connecting the operators via the OperatorRouter
+    this.rteCntx = new SimpleRouter();
+    // 1. set two system input operators (i.e. two window operators)
+    this.rteCntx.addTupleOperator(spec1.getInputName(), wnd1);
+    this.rteCntx.addTupleOperator(spec2.getInputName(), wnd2);
+    // 2. connect join operator to both window operators
+    this.rteCntx.addRelationOperator(spec1.getOutputName(), join);
+    this.rteCntx.addRelationOperator(spec2.getOutputName(), join);
+    // 3. connect stream operator to the join operator
+    this.rteCntx.addRelationOperator(joinSpec.getOutputName(), istream);
+    // 4. connect re-partition operator to the stream operator
+    this.rteCntx.addTupleOperator(istrmSpec.getOutputName(), par);
+    // 5. set the system inputs
+    this.rteCntx.addSystemInput(spec1.getInputName());
+    this.rteCntx.addSystemInput(spec2.getInputName());
+
+    for (Iterator<Operator> iter = this.rteCntx.iterator(); iter.hasNext();) {
+      iter.next().init(config, context);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index bb07a3b..08e548c 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -26,7 +26,8 @@ include \
   'samza-log4j',
   'samza-shell',
   'samza-yarn',
-  'samza-test'
+  'samza-test',
+  'samza-sql'
 
 rootProject.children.each {
   if (it.name != 'samza-api' && it.name != 'samza-shell' && it.name != 
'samza-log4j') {

Reply via email to