SAMZA-482; create samza-sql module, and add a basic set of non-functional 
operators into it


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d4861df4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d4861df4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d4861df4

Branch: refs/heads/samza-sql
Commit: d4861df4d4cd37f2d5ddc2db5e3158426de4139c
Parents: 6743df3
Author: Yi Pan <[email protected]>
Authored: Thu Feb 12 14:27:29 2015 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Thu Feb 12 14:27:29 2015 -0800

----------------------------------------------------------------------
 build.gradle                                    |  20 +++
 gradle/dependency-versions.gradle               |   1 +
 samza-sql/README                                |   1 +
 .../apache/samza/sql/api/data/EntityName.java   | 141 ++++++++++++++++
 .../org/apache/samza/sql/api/data/Relation.java |  47 ++++++
 .../org/apache/samza/sql/api/data/Tuple.java    |  58 +++++++
 .../samza/sql/api/operators/Operator.java       |  43 +++++
 .../sql/api/operators/RelationOperator.java     |  51 ++++++
 .../sql/api/operators/SqlOperatorFactory.java   |  51 ++++++
 .../samza/sql/api/operators/TupleOperator.java  |  47 ++++++
 .../sql/api/operators/spec/OperatorSpec.java    |  64 ++++++++
 .../samza/sql/api/router/OperatorRouter.java    | 126 +++++++++++++++
 .../samza/sql/data/IncomingMessageTuple.java    |  74 +++++++++
 .../sql/operators/factory/SimpleOperator.java   |  50 ++++++
 .../factory/SimpleOperatorFactoryImpl.java      |  63 ++++++++
 .../operators/factory/SimpleOperatorSpec.java   | 106 +++++++++++++
 .../sql/operators/partition/PartitionOp.java    |  90 +++++++++++
 .../sql/operators/partition/PartitionSpec.java  |  91 +++++++++++
 .../samza/sql/operators/relation/Join.java      | 139 ++++++++++++++++
 .../samza/sql/operators/relation/JoinSpec.java  |  60 +++++++
 .../sql/operators/stream/InsertStream.java      |  98 ++++++++++++
 .../sql/operators/stream/InsertStreamSpec.java  |  42 +++++
 .../sql/operators/window/BoundedTimeWindow.java | 141 ++++++++++++++++
 .../samza/sql/operators/window/WindowSpec.java  |  67 ++++++++
 .../samza/sql/operators/window/WindowState.java |  44 +++++
 .../apache/samza/sql/router/SimpleRouter.java   | 133 ++++++++++++++++
 .../task/sql/OperatorMessageCollector.java      |  80 ++++++++++
 .../samza/task/sql/SqlMessageCollector.java     |  64 ++++++++
 .../samza/task/sql/StoreMessageCollector.java   |  80 ++++++++++
 .../samza/task/sql/RandomOperatorTask.java      | 151 ++++++++++++++++++
 .../apache/samza/task/sql/StreamSqlTask.java    | 159 +++++++++++++++++++
 settings.gradle                                 |   3 +-
 32 files changed, 2384 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index b803276..e6b10fc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -246,6 +246,26 @@ project(":samza-yarn_$scalaVersion") {
   jar.dependsOn("lesscss")
 }
 
+project(":samza-sql_$scalaVersion") {
+  apply plugin: 'java'
+
+  configurations {
+    // Remove transitive dependencies from Zookeeper that we don't want.
+    compile.exclude group: 'javax.jms', module: 'jms'
+    compile.exclude group: 'com.sun.jdmk', module: 'jmxtools'
+    compile.exclude group: 'com.sun.jmx', module: 'jmxri'
+  }
+
+  dependencies {
+    compile project(':samza-api')
+    compile project(":samza-core_$scalaVersion")
+    compile project(":samza-kv_$scalaVersion")
+    compile "commons-collections:commons-collections:$commonsCollectionVersion"
+    testCompile "junit:junit:$junitVersion"
+    testCompile "org.mockito:mockito-all:$mockitoVersion"
+  }
+}
+
 project(":samza-shell") {
   apply plugin: 'java'
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle 
b/gradle/dependency-versions.gradle
index 84be50b..6f815b2 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -34,4 +34,5 @@
   log4jVersion = "1.2.17"
   guavaVersion = "17.0"
   commonsCodecVersion = "1.9"
+  commonsCollectionVersion = "3.2.1"
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/README
----------------------------------------------------------------------
diff --git a/samza-sql/README b/samza-sql/README
new file mode 100644
index 0000000..65b7558
--- /dev/null
+++ b/samza-sql/README
@@ -0,0 +1 @@
+samza-sql is an experimental module that is under development (SAMZA-390).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/data/EntityName.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/api/data/EntityName.java 
b/samza-sql/src/main/java/org/apache/samza/sql/api/data/EntityName.java
new file mode 100644
index 0000000..127a677
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/EntityName.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.data;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * This class defines the name scheme for the collective data entities in 
Samza Stream SQL, i.e. relations and streams.
+ */
+public class EntityName {
+  /**
+   * <code>EntityType</code> defines the types of the entity names
+   *
+   */
+  private enum EntityType {
+    RELATION,
+    STREAM
+  };
+
+  /**
+   * Type of the entity name
+   */
+  private final EntityType type;
+
+  /**
+   * Formatted name of the entity.
+   *
+   * <p>This formatted name of the entity should be unique identifier for the 
corresponding relation/stream in the system.
+   * e.g. for a Kafka system stream named "mystream", the formatted name 
should be "kafka:mystream".
+   */
+  private final String name;
+
+  //TODO: we may want to replace the map with Guava cache to allow GC
+  /**
+   * Static map of already allocated relation names
+   */
+  private static Map<String, EntityName> relations = new HashMap<String, 
EntityName>();
+
+  /**
+   * Static map of already allocated stream names
+   */
+  private static Map<String, EntityName> streams = new HashMap<String, 
EntityName>();
+
+  /**
+   * Private ctor to create entity names
+   *
+   * @param type Type of the entity name
+   * @param name Formatted name of the entity
+   */
+  private EntityName(EntityType type, String name) {
+    this.type = type;
+    this.name = name;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s:%s", this.type, this.name);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof EntityName) {
+      EntityName otherEntity = (EntityName) other;
+      return this.type.equals(otherEntity.type) && 
this.name.equals(otherEntity.name);
+    }
+    return false;
+  }
+
+  /**
+   * Check to see whether this entity name is for a relation
+   *
+   * @return true if the entity type is <code>EntityType.RELATION</code>; 
false otherwise
+   */
+  public boolean isRelation() {
+    return this.type.equals(EntityType.RELATION);
+  }
+
+  /**
+   * Check to see whether this entity name is for a stream
+   *
+   * @return true if the entity type is <code>EntityType.STREAM</code>; false 
otherwise
+   */
+  public boolean isStream() {
+    return this.type.equals(EntityType.STREAM);
+  }
+
+  /**
+   * Get the formatted entity name
+   *
+   * @return The formatted entity name
+   */
+  public String getName() {
+    return this.name;
+  }
+
+  /**
+   * Static method to get the instance of <code>EntityName</code> with type 
<code>EntityType.RELATION</code>
+   *
+   * @param name The formatted entity name of the relation
+   * @return A <code>EntityName</code> for a relation
+   */
+  public static EntityName getRelationName(String name) {
+    if (relations.get(name) == null) {
+      relations.put(name, new EntityName(EntityType.RELATION, name));
+    }
+    return relations.get(name);
+  }
+
+  /**
+   * Static method to get the instance of <code>EntityName</code> with type 
<code>EntityType.STREAM</code>
+   *
+   * @param name The formatted entity name of the stream
+   * @return A <code>EntityName</code> for a stream
+   */
+  public static EntityName getStreamName(String name) {
+    if (streams.get(name) == null) {
+      streams.put(name, new EntityName(EntityType.STREAM, name));
+    }
+    return streams.get(name);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/data/Relation.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Relation.java 
b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Relation.java
new file mode 100644
index 0000000..90b8026
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Relation.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.data;
+
+import org.apache.samza.storage.kv.KeyValueStore;
+
+
+/**
+ * This class defines the general interface of <code>Relation</code>, which is 
defined as a map of <code>Tuple</code>.
+ *
+ * <p>The interface is defined as an extension to 
<code>KeyValueStore&lt;Object, Tuple&gt;</code>.
+ *
+ */
+
+public interface Relation extends KeyValueStore<Object, Tuple> {
+
+  /**
+   * Get the primary key field name for this table
+   *
+   * @return The name of the primary key field
+   */
+  String getPrimaryKey();
+
+  /**
+   * Get the name of the relation created by CREATE TABLE
+   *
+   * @return The relation name
+   */
+  EntityName getName();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java 
b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
new file mode 100644
index 0000000..0c21a53
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.data;
+
+/**
+ * This class defines the generic interface of <code>Tuple</code>, which is a 
entry from the incoming stream, or one row in a <code>Relation</code>.
+ *
+ * <p>The <code>Tuple</code> models the basic operatible unit in streaming SQL 
processes in Samza.
+ *
+ */
+public interface Tuple {
+
+  /**
+   * Access method to get the corresponding message body in the tuple
+   *
+   * @return Message object in the tuple
+   */
+  Object getMessage();
+
+  /**
+   * Method to indicate whether the tuple is a delete tuple or an insert tuple
+   *
+   * @return A boolean value indicates whether the current tuple is a delete 
or insert message
+   */
+  boolean isDelete();
+
+  /**
+   * Access method to the key of the tuple
+   *
+   * @return The <code>key</code> of the tuple
+   */
+  Object getKey();
+
+  /**
+   * Get the stream name of the tuple. Note this stream name should be unique 
in the system.
+   *
+   * @return The stream name which this tuple belongs to
+   */
+  EntityName getStreamName();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/operators/Operator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/Operator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/Operator.java
new file mode 100644
index 0000000..0169f2d
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/Operator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.operators;
+
+import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.WindowableTask;
+
+
+/**
+ * This class defines the common interface for operator classes, no matter 
what input data are.
+ *
+ * <p> It extends the <code>InitableTask</code> and 
<code>WindowableTask</code> to reuse the interface methods
+ * <code>init</code> and <code>window</code> for initialization and timeout 
operations
+ *
+ */
+public interface Operator extends InitableTask, WindowableTask {
+
+  /**
+   * Method to the specification of this <code>Operator</code>
+   *
+   * @return The <code>OperatorSpec</code> object that defines the 
configuration/parameters of the operator
+   */
+  OperatorSpec getSpec();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java
new file mode 100644
index 0000000..faa0a32
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.operators;
+
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.task.sql.SqlMessageCollector;
+
+
+/**
+ * This class defines the interface <code>RelationOperator</code>.
+ *
+ * <p>All operators implementing <code>RelationOperator</code> will take a 
<code>Relation</code> object as input.
+ * The SQL operators that need to implement this interface include:
+ * <ul>
+ * <li>All relation algebra operators, such as: join, select, where, group-by, 
having, limit, order-by, etc.
+ * <li>All relation-to-stream operators, which converts a relation to a stream
+ * </ul>
+ *
+ */
+public interface RelationOperator extends Operator {
+
+  /**
+   * Method to perform a relational algebra on a set of relations, or a 
relation-to-stream function
+   *
+   * <p> The actual implementation of relational logic is performed by the 
implementation of this method.
+   * The <code>collector</code> object is used by the operator to send their 
output to
+   *
+   * @param deltaRelation The changed rows in the input relation, including 
the inserts/deletes/updates
+   * @param collector The <code>SqlMessageCollector</code> object that accepts 
outputs from the operator
+   * @throws Exception Throws exception if failed
+   */
+  void process(Relation deltaRelation, SqlMessageCollector collector) throws 
Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
new file mode 100644
index 0000000..67671b9
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.operators;
+
+import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+
+
+/**
+ * This class defines the interface of SQL operator factory, which creates the 
following operators:
+ * <ul>
+ * <li><code>RelationOperator</code> that takes <code>Relation</code> as input 
variables
+ * <li><code>TupleOperator</code> that takes <code>Tuple</code> as input 
variables
+ * </ul>
+ *
+ */
+public interface SqlOperatorFactory {
+
+  /**
+   * Interface method to create/get the <code>RelationOperator</code> object
+   *
+   * @param spec The specification of the <code>RelationOperator</code> object
+   * @return The relation operator object
+   */
+  RelationOperator getRelationOperator(OperatorSpec spec);
+
+  /**
+   * Interface method to create/get the <code>TupleOperator</code> object
+   *
+   * @param spec The specification of the <code>TupleOperator</code> object
+   * @return The tuple operator object
+   */
+  TupleOperator getTupleOperator(OperatorSpec spec);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java
new file mode 100644
index 0000000..ac4654e
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.operators;
+
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.task.sql.SqlMessageCollector;
+
+
+/**
+ * This class defines the interface class that processes incoming tuples from 
input stream(s).
+ *
+ * <p>All operators implementing <code>TupleOperator</code> will take a 
<code>Tuple</code> object as input.
+ * The SQL operators that need to implement this interface include:
+ * <ul>
+ * <li>All stream-to-relation operators, such as: window operators.
+ * <li>All stream-to-stream operators, such as: re-partition, union of two 
streams
+ * </ul>
+ *
+ */
+public interface TupleOperator extends Operator {
+  /**
+   * Interface method to process on an input tuple.
+   *
+   * @param tuple The input tuple, which has the incoming message from a stream
+   * @param collector The <code>SqlMessageCollector</code> object that accepts 
outputs from the operator
+   * @throws Exception Throws exception if failed
+   */
+  void process(Tuple tuple, SqlMessageCollector collector) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java
new file mode 100644
index 0000000..96385e2
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.operators.spec;
+
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+
+
+/**
+ * This class defines a generic specification interface class for all 
operators.
+ *
+ * <p>The purpose of this class is to encapsulate all the details of 
configuration/parameters of a specific implementation of an operator.
+ *
+ * <p>The generic methods for an operator specification is to provide methods 
to get the unique ID, the list of entity names (i.e. stream name
+ * in <code>Tuple</code> or <code>Relation</code> name) of input variables , 
and the list of entity names of the output variables.
+ *
+ */
+public interface OperatorSpec {
+  /**
+   * Interface method that returns the unique ID of the operator in a task
+   *
+   * @return The unique ID of the <code>Operator</code> object
+   */
+  String getId();
+
+  /**
+   * Access method to the list of entity names of input variables.
+   *
+   * <p>The input entity names are either stream names if the operator is a 
<code>TupleOperator</code>;
+   * or <code>Relation</code> names if the operator is a 
<code>RelationOperator</code>
+   *
+   * @return A list of entity names of the inputs
+   */
+  List<EntityName> getInputNames();
+
+  /**
+   * Access method to the list of entity name of the output variable
+   *
+   * <p>The output entity name is either a stream name if the operator 
generates tuples as an output stream;
+   * or <code>Relation</code> names if the operator generates a 
<code>Relation</code> as output.
+   *
+   * @return The entity name of the output
+   *
+   */
+  List<EntityName> getOutputNames();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java 
b/samza-sql/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java
new file mode 100644
index 0000000..2455a62
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.router;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.operators.Operator;
+import org.apache.samza.sql.api.operators.RelationOperator;
+import org.apache.samza.sql.api.operators.TupleOperator;
+
+
+/**
+ * This interface class defines interface methods to connect operators 
together.
+ *
+ * <p>The <code>OperatorRouter</code> allows the user to attach operators to a 
relation or a stream entity,
+ * if the corresponding relation/stream is included as inputs to the operator. 
Each operator then executes its own logic
+ * and determines which relation/stream to emit the output to. Through the 
<code>OperatorRouter</code>, the next
+ * operators attached to the corresponding output entities (i.e. 
relations/streams) can then be invoked to continue the
+ * stream process task.
+ *
+ * <p>The <code>OperatorRouter</code> also allows the user to set the system 
input entities (i.e. relations/streams)
+ * that are fed into the operators by the system outside the 
<code>OperatorRouter</code>, not generated by some
+ * operators in the <code>OperatorRouter</code>.
+ *
+ * <p>The methods included in this interface class allow a user to
+ * <ul>
+ * <li>i)   add operators to an <code>EntityName</code>
+ * <li>ii)  get the next operators attached to an <code>EntityName</code>
+ * <li>iii) add and get the system input <code>EntityName</code>s
+ * <li>iv)  iterate through each and every operator connected via 
<code>OperatorRouter</code>
+ * </ul>
+ *
+ */
+public interface OperatorRouter {
+
+  /**
+   * This method adds a <code>TupleOperator</code> as one of the input 
operators.
+   *
+   * @param stream The output stream entity name
+   * @param nextOp The <code>TupleOperator</code> that takes the tuples in the 
<code>stream</code> as an input.
+   * @throws Exception Throws exception if failed
+   */
+  void addTupleOperator(EntityName stream, TupleOperator nextOp) throws 
Exception;
+
+  /**
+   * This method adds a <code>RelationOperator</code> as one of the input 
operators
+
+   * @param relation The input relation entity name
+   * @param nextOp The <code>RelationOperator</code> that takes the 
<code>relation</code> as an input
+   * @throws Exception Throws exception if failed
+   */
+  void addRelationOperator(EntityName relation, RelationOperator nextOp) 
throws Exception;
+
+  /**
+   * This method gets the list of <code>RelationOperator</code>s attached to 
the <code>relation</code>
+   *
+   * @param relation The identifier of the relation entity
+   * @return The list of <code>RelationOperator</code> taking 
<code>relation</code> as an input variable
+   */
+  List<RelationOperator> getRelationOperators(EntityName relation);
+
+  /**
+   * This method gets the list of <code>TupleOperator</code>s attached to the 
<code>stream</code>
+   *
+   * @param stream The identifier of the stream entity
+   * @return The list of <code>TupleOperator</code> taking <code>stream</code> 
as an input variable
+   */
+  List<TupleOperator> getTupleOperators(EntityName stream);
+
+  /**
+   * This method gets the list of <code>Operator</code>s attached to an output 
entity (of any type)
+   *
+   * @param output The identifier of the output entity
+   * @return The list of <code>Operator</code> taking <code>output</code> as 
input variables
+   */
+  List<Operator> getNextOperators(EntityName output);
+
+  /**
+   * This method provides an iterator to go through all operators connected 
via <code>OperatorRouter</code>
+   *
+   * @return An <code>Iterator</code> for all operators connected via 
<code>OperatorRouter</code>
+   */
+  Iterator<Operator> iterator();
+
+  /**
+   * This method checks to see whether there is any <code>Operator</code> 
attached to the entity <code>output</code>
+   *
+   * @param output The output entity name
+   * @return True if there is some operator attached to the 
<code>output</code>; false otherwise
+   */
+  boolean hasNextOperators(EntityName output);
+
+  /**
+   * This method adds an entity as the system input
+   *
+   * @param input The entity name for the system input
+   */
+  void addSystemInput(EntityName input);
+
+  /**
+   * This method returns the list of entities as system inputs
+   *
+   * @return The list of <code>EntityName</code>s as system inputs
+   */
+  List<EntityName> getSystemInputs();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java 
b/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
new file mode 100644
index 0000000..a8a55e2
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.data;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+
+/**
+ * This class implements a <code>Tuple</code> class that encapsulates 
<code>IncomingMessageEnvelope</code> from the system
+ *
+ */
+public class IncomingMessageTuple implements Tuple {
+  /**
+   * Incoming message envelope
+   */
+  private final IncomingMessageEnvelope imsg;
+
+  /**
+   * The entity name for the incoming system stream
+   */
+  private final EntityName strmEntity;
+
+  /**
+   * Ctor to create a <code>IncomingMessageTuple</code> from 
<code>IncomingMessageEnvelope</code>
+   *
+   * @param imsg The incoming system message
+   */
+  public IncomingMessageTuple(IncomingMessageEnvelope imsg) {
+    this.imsg = imsg;
+    this.strmEntity =
+        EntityName.getStreamName(String.format("%s:%s", 
imsg.getSystemStreamPartition().getSystem(), imsg
+            .getSystemStreamPartition().getStream()));
+  }
+
+  // TODO: the return type should be changed to the generic data type
+  @Override
+  public Object getMessage() {
+    return this.imsg.getMessage();
+  }
+
+  @Override
+  public boolean isDelete() {
+    return false;
+  }
+
+  @Override
+  public Object getKey() {
+    return imsg.getKey();
+  }
+
+  @Override
+  public EntityName getStreamName() {
+    return this.strmEntity;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java
new file mode 100644
index 0000000..c634159
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.factory;
+
+import org.apache.samza.sql.api.operators.Operator;
+import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+
+
+/**
+ * An abstract class that encapsulate the basic information and methods that 
all operator classes should implement.
+ *
+ */
+public abstract class SimpleOperator implements Operator {
+  /**
+   * The specification of this operator
+   */
+  private final OperatorSpec spec;
+
+  /**
+   * Ctor of <code>SimpleOperator</code> class
+   *
+   * @param spec The specification of this operator
+   */
+  public SimpleOperator(OperatorSpec spec) {
+    this.spec = spec;
+  }
+
+  @Override
+  public OperatorSpec getSpec() {
+    return this.spec;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
new file mode 100644
index 0000000..916b166
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.factory;
+
+import org.apache.samza.sql.api.operators.RelationOperator;
+import org.apache.samza.sql.api.operators.SqlOperatorFactory;
+import org.apache.samza.sql.api.operators.TupleOperator;
+import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.sql.operators.partition.PartitionOp;
+import org.apache.samza.sql.operators.partition.PartitionSpec;
+import org.apache.samza.sql.operators.relation.Join;
+import org.apache.samza.sql.operators.relation.JoinSpec;
+import org.apache.samza.sql.operators.stream.InsertStream;
+import org.apache.samza.sql.operators.stream.InsertStreamSpec;
+import org.apache.samza.sql.operators.window.BoundedTimeWindow;
+import org.apache.samza.sql.operators.window.WindowSpec;
+
+
+/**
+ * This simple factory class provides method to create the build-in operators 
per operator specification.
+ * It can be extended when the build-in operators expand.
+ *
+ */
+public class SimpleOperatorFactoryImpl implements SqlOperatorFactory {
+
+  @Override
+  public RelationOperator getRelationOperator(OperatorSpec spec) {
+    if (spec instanceof JoinSpec) {
+      return new Join((JoinSpec) spec);
+    } else if (spec instanceof InsertStreamSpec) {
+      return new InsertStream((InsertStreamSpec) spec);
+    }
+    throw new UnsupportedOperationException("Unsupported operator specified: " 
+ spec.getClass().getCanonicalName());
+  }
+
+  @Override
+  public TupleOperator getTupleOperator(OperatorSpec spec) {
+    if (spec instanceof WindowSpec) {
+      return new BoundedTimeWindow((WindowSpec) spec);
+    } else if (spec instanceof PartitionSpec) {
+      return new PartitionOp((PartitionSpec) spec);
+    }
+    throw new UnsupportedOperationException("Unsupported operator specified" + 
spec.getClass().getCanonicalName());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
new file mode 100644
index 0000000..93d4ebb
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.operators.factory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+
+
+/**
+ * An abstract class that encapsulate the basic information and methods that 
all specification of operators should implement.
+ *
+ */
+public abstract class SimpleOperatorSpec implements OperatorSpec {
+  /**
+   * The identifier of the corresponding operator
+   */
+  private final String id;
+
+  /**
+   * The list of input entity names of the corresponding operator
+   */
+  private final List<EntityName> inputs = new ArrayList<EntityName>();
+
+  /**
+   * The list of output entity names of the corresponding operator
+   */
+  private final List<EntityName> outputs = new ArrayList<EntityName>();
+
+  /**
+   * Ctor of the <code>SimpleOperatorSpec</code> for simple 
<code>Operator</code>s w/ one input and one output
+   *
+   * @param id Unique identifier of the <code>Operator</code> object
+   * @param input The only input entity
+   * @param output The only output entity
+   */
+  public SimpleOperatorSpec(String id, EntityName input, EntityName output) {
+    this.id = id;
+    this.inputs.add(input);
+    this.outputs.add(output);
+  }
+
+  /**
+   * Ctor of <code>SimpleOperatorSpec</code> with general format: m inputs and 
n outputs
+   *
+   * @param id Unique identifier of the <code>Operator</code> object
+   * @param inputs The list of input entities
+   * @param output The list of output entities
+   */
+  public SimpleOperatorSpec(String id, List<EntityName> inputs, EntityName 
output) {
+    this.id = id;
+    this.inputs.addAll(inputs);
+    this.outputs.add(output);
+  }
+
+  @Override
+  public String getId() {
+    return this.id;
+  }
+
+  @Override
+  public List<EntityName> getInputNames() {
+    return this.inputs;
+  }
+
+  @Override
+  public List<EntityName> getOutputNames() {
+    return this.outputs;
+  }
+
+  /**
+   * Method to get the first output entity
+   *
+   * @return The first output entity name
+   */
+  public EntityName getOutputName() {
+    return this.outputs.get(0);
+  }
+
+  /**
+   * Method to get the first input entity
+   *
+   * @return The first input entity name
+   */
+  public EntityName getInputName() {
+    return this.inputs.get(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
new file mode 100644
index 0000000..7921d4f
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.partition;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.TupleOperator;
+import org.apache.samza.sql.operators.factory.SimpleOperator;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.sql.SqlMessageCollector;
+
+
+/**
+ * This is an example build-in operator that performs a simple stream 
re-partition operation.
+ *
+ */
+public final class PartitionOp extends SimpleOperator implements TupleOperator 
{
+
+  /**
+   * The specification of this <code>PartitionOp</code>
+   *
+   */
+  private final PartitionSpec spec;
+
+  /**
+   * Ctor that takes the <code>PartitionSpec</code> object as input.
+   *
+   * @param spec The <code>PartitionSpec</code> object
+   */
+  public PartitionOp(PartitionSpec spec) {
+    super(spec);
+    this.spec = spec;
+  }
+
+  /**
+   * A simplified constructor that allow users to randomly create 
<code>PartitionOp</code>
+   *
+   * @param id The identifier of this operator
+   * @param input The input stream name of this operator
+   * @param system The output system name of this operator
+   * @param output The output stream name of this operator
+   * @param parKey The partition key used for the output stream
+   * @param parNum The number of partitions used for the output stream
+   */
+  public PartitionOp(String id, String input, String system, String output, 
String parKey, int parNum) {
+    super(new PartitionSpec(id, input, new SystemStream(system, output), 
parKey, parNum));
+    this.spec = (PartitionSpec) super.getSpec();
+  }
+
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    // TODO Auto-generated method stub
+    // No need to initialize store since all inputs are immediately send out
+  }
+
+  @Override
+  public void window(MessageCollector collector, TaskCoordinator coordinator) 
throws Exception {
+    // TODO Auto-generated method stub
+    // NOOP or flush
+  }
+
+  @Override
+  public void process(Tuple tuple, SqlMessageCollector collector) throws 
Exception {
+    collector.send(new 
OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getKey(),
+        null /* TODO: when merge with Schema API changes, use: tuple
+             .getMessage().getFieldData(PartitionOp.this.spec.getParKey()) */, 
tuple.getMessage()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
new file mode 100644
index 0000000..29d1784
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.partition;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * This class defines the specification class of <code>PartitionOp</code> 
operator
+ *
+ */
+public class PartitionSpec extends SimpleOperatorSpec implements OperatorSpec {
+
+  /**
+   * The partition key name
+   */
+  private final String parKey;
+
+  /**
+   * The number of partitions
+   */
+  private final int parNum;
+
+  /**
+   * The <code>SystemStream</code> to send the partition output to
+   */
+  private final SystemStream sysStream;
+
+  /**
+   * Ctor to create the <code>PartitionSpec</code>
+   *
+   * @param id The ID of the <code>PartitionOp</code>
+   * @param input The input stream name
+   * @param output The output <code>SystemStream</code> object
+   * @param parKey The name of the partition key
+   * @param parNum The number of partitions
+   */
+  public PartitionSpec(String id, String input, SystemStream output, String 
parKey, int parNum) {
+    super(id, EntityName.getStreamName(input), 
EntityName.getStreamName(output.getSystem() + ":" + output.getStream()));
+    this.parKey = parKey;
+    this.parNum = parNum;
+    this.sysStream = output;
+  }
+
+  /**
+   * Method to get the partition key name
+   *
+   * @return The partition key name
+   */
+  public String getParKey() {
+    return this.parKey;
+  }
+
+  /**
+   * Method to get the number of partitions
+   *
+   * @return The number of partitions
+   */
+  public int getParNum() {
+    return this.parNum;
+  }
+
+  /**
+   * Method to get the output <code>SystemStream</code>
+   *
+   * @return The output system stream object
+   */
+  public SystemStream getSystemStream() {
+    return this.sysStream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/Join.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/Join.java 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/Join.java
new file mode 100644
index 0000000..a8a6eaf
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/Join.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.relation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.operators.RelationOperator;
+import org.apache.samza.sql.operators.factory.SimpleOperator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.sql.SqlMessageCollector;
+
+
+/**
+ * This class defines an example build-in operator for a join operator between 
two relations.
+ *
+ */
+public class Join extends SimpleOperator implements RelationOperator {
+
+  private final JoinSpec spec;
+
+  /**
+   * The input relations
+   *
+   */
+  private List<Relation> inputs = null;
+
+  /**
+   * The output relation
+   */
+  private Relation output = null;
+
+  /**
+   * Ctor that creates <code>Join</code> operator based on the specification.
+   *
+   * @param spec The <code>JoinSpec</code> object that specifies the join 
operator
+   */
+  public Join(JoinSpec spec) {
+    super(spec);
+    this.spec = spec;
+  }
+
+  /**
+   * An alternative ctor that allows users to create a join operator randomly.
+   *
+   * @param id The identifier of the join operator
+   * @param joinIns The list of input relation names of the join
+   * @param joinOut The output relation name of the join
+   * @param joinKeys The list of keys used in the join. Each entry in the 
<code>joinKeys</code> is the key name used in one of the input relations.
+   *     The order of the <code>joinKeys</code> MUST be the same as their 
corresponding relation names in <code>joinIns</code>
+   */
+  @SuppressWarnings("serial")
+  public Join(final String id, final List<String> joinIns, final String 
joinOut, final List<String> joinKeys) {
+    super(new JoinSpec(id, new ArrayList<EntityName>() {
+      {
+        for (String name : joinIns) {
+          add(EntityName.getRelationName(name));
+        }
+      }
+    }, EntityName.getRelationName(joinOut), joinKeys));
+    this.spec = (JoinSpec) this.getSpec();
+  }
+
+  private boolean hasPendingChanges() {
+    return getPendingChanges() != null;
+  }
+
+  private Relation getPendingChanges() {
+    // TODO Auto-generated method stub
+    // return any pending changes that have not been processed yet
+    return null;
+  }
+
+  private Relation getOutputChanges() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  private boolean hasOutputChanges() {
+    // TODO Auto-generated method stub
+    return getOutputChanges() != null;
+  }
+
+  private void join(Relation deltaRelation) {
+    // TODO Auto-generated method stub
+    // implement the join logic
+    // 1. calculate the delta changes in <code>output</code>
+    // 2. check output condition to see whether the current input should 
trigger an output
+    // 3. set the output changes and pending changes
+  }
+
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    for (EntityName relation : this.spec.getInputNames()) {
+      inputs.add((Relation) context.getStore(relation.toString()));
+    }
+    this.output = (Relation) 
context.getStore(this.spec.getOutputName().toString());
+  }
+
+  @Override
+  public void window(MessageCollector collector, TaskCoordinator coordinator) 
throws Exception {
+    SqlMessageCollector sqlCollector = (SqlMessageCollector) collector;
+    if (hasPendingChanges()) {
+      sqlCollector.send(getPendingChanges());
+    }
+    sqlCollector.timeout(this.spec.getOutputNames());
+  }
+
+  @Override
+  public void process(Relation deltaRelation, SqlMessageCollector collector) 
throws Exception {
+    // calculate join based on the input <code>deltaRelation</code>
+    join(deltaRelation);
+    if (hasOutputChanges()) {
+      collector.send(getOutputChanges());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java
new file mode 100644
index 0000000..ba8bfb5
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.relation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
+
+
+/**
+ * This class implements specification class for the build-in 
<code>Join</code> operator
+ */
+public class JoinSpec extends SimpleOperatorSpec implements OperatorSpec {
+  /**
+   * Join keys defined for each input relation
+   */
+  private final List<String> joinKeys = new ArrayList<String>();
+
+  /**
+   * Default ctor for the <code>JoinSpec</code>
+   *
+   * @param id Unique ID of the <code>Join</code> object
+   * @param joinIns The list of input relations
+   * @param joinOut The output relation
+   * @param joinKeys The list of join keys in input relations
+   */
+  public JoinSpec(String id, List<EntityName> joinIns, EntityName joinOut, 
List<String> joinKeys) {
+    super(id, joinIns, joinOut);
+    this.joinKeys.addAll(joinKeys);
+  }
+
+  /**
+   * Method to get the list of join keys
+   *
+   * @return The list of join keys
+   */
+  public List<String> getJoinKeys() {
+    return this.joinKeys;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java
new file mode 100644
index 0000000..7563100
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.stream;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.RelationOperator;
+import org.apache.samza.sql.operators.factory.SimpleOperator;
+import org.apache.samza.storage.kv.KeyValueIterator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.sql.SqlMessageCollector;
+
+
+/**
+ * This class defines an example build-in operator for an istream operator 
that converts a relation to a stream
+ *
+ */
+public class InsertStream extends SimpleOperator implements RelationOperator {
+  /**
+   * The <code>InsertStreamSpec</code> for this operator
+   */
+  private final InsertStreamSpec spec;
+
+  /**
+   * The time-varying relation that is to be converted into a stream
+   */
+  private Relation relation = null;
+
+  /**
+   * Ctor that takes the specication of the object as input parameter
+   *
+   * <p>This version of constructor is often used in an implementation of 
<code>SqlOperatorFactory</code>
+   *
+   * @param spec The <code>InsertStreamSpec</code> specification of this 
operator
+   */
+  public InsertStream(InsertStreamSpec spec) {
+    super(spec);
+    this.spec = spec;
+  }
+
+  /**
+   * An alternative ctor that allow users to create an 
<code>InsertStream</code> object randomly
+   *
+   * @param id The identifier of the <code>InsertStream</code> object
+   * @param input The input relation
+   * @param output The output stream
+   */
+  public InsertStream(String id, String input, String output) {
+    super(new InsertStreamSpec(id, EntityName.getRelationName(input), 
EntityName.getStreamName(output)));
+    this.spec = (InsertStreamSpec) super.getSpec();
+  }
+
+  @Override
+  public void process(Relation deltaRelation, SqlMessageCollector collector) 
throws Exception {
+    KeyValueIterator<Object, Tuple> iterator = deltaRelation.all();
+    for (; iterator.hasNext();) {
+      Tuple tuple = iterator.next().getValue();
+      if (!tuple.isDelete()) {
+        collector.send(tuple);
+      }
+    }
+  }
+
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    if (this.relation == null) {
+      this.relation = (Relation) 
context.getStore(this.spec.getInputName().toString());
+    }
+  }
+
+  @Override
+  public void window(MessageCollector collector, TaskCoordinator coordinator) 
throws Exception {
+    // TODO Auto-generated method stub
+    // assuming this operation does not have pending changes kept in memory
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java
new file mode 100644
index 0000000..70475ce
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.stream;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
+
+
+/**
+ * Example implementation of specification of <code>InsertStream</code> 
operator
+ */
+public class InsertStreamSpec extends SimpleOperatorSpec implements 
OperatorSpec {
+
+  /**
+   * Default ctor of <code>InsertStreamSpec</code>
+   *
+   * @param id The identifier of the operator
+   * @param input The input relation entity
+   * @param output The output stream entity
+   */
+  public InsertStreamSpec(String id, EntityName input, EntityName output) {
+    super(id, input, output);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
new file mode 100644
index 0000000..935ffc0
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.window;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.TupleOperator;
+import org.apache.samza.sql.operators.factory.SimpleOperator;
+import org.apache.samza.storage.kv.KeyValueIterator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.sql.SqlMessageCollector;
+
+
+/**
+ * This class defines an example build-in operator for a fixed size window 
operator that converts a stream to a relation
+ *
+ */
+public class BoundedTimeWindow extends SimpleOperator implements TupleOperator 
{
+
+  /**
+   * The specification of this window operator
+   */
+  private final WindowSpec spec;
+
+  /**
+   * The relation that the window operator keeps internally
+   */
+  private Relation relation = null;
+
+  /**
+   * The list of window states of all active windows the window operator keeps 
in track
+   */
+  private List<WindowState> windowStates = null;
+
+  /**
+   * Ctor that takes <code>WindowSpec</code> specification as input argument
+   *
+   * <p>This version of constructor is often used in an implementation of 
<code>SqlOperatorFactory</code>
+   *
+   * @param spec The window specification object
+   */
+  public BoundedTimeWindow(WindowSpec spec) {
+    super(spec);
+    this.spec = spec;
+  }
+
+  /**
+   * A simplified version of ctor that allows users to randomly created a 
window operator w/o spec object
+   *
+   * @param wndId The identifier of this window operator
+   * @param lengthSec The window size in seconds
+   * @param input The input stream name
+   * @param output The output relation name
+   */
+  public BoundedTimeWindow(String wndId, int lengthSec, String input, String 
output) {
+    super(new WindowSpec(wndId, EntityName.getStreamName(input), 
EntityName.getRelationName(output), lengthSec));
+    this.spec = (WindowSpec) super.getSpec();
+  }
+
+  @Override
+  public void process(Tuple tuple, SqlMessageCollector collector) throws 
Exception {
+    // for each tuple, this will evaluate the incoming tuple and update the 
window states.
+    // If the window states allow generating output, calculate the delta 
changes in
+    // the window relation and execute the relation operation 
<code>nextOp</code>
+    updateWindow(tuple);
+    processWindowChanges(collector);
+  }
+
+  private void processWindowChanges(SqlMessageCollector collector) throws 
Exception {
+    if (windowStateChange()) {
+      collector.send(getWindowChanges());
+    }
+  }
+
+  private Relation getWindowChanges() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  private boolean windowStateChange() {
+    // TODO Auto-generated method stub
+    return getWindowChanges() != null;
+  }
+
+  private void updateWindow(Tuple tuple) {
+    // TODO Auto-generated method stub
+    // The window states are updated here
+    // And the correpsonding deltaChanges is also calculated here.
+  }
+
+  private void updateWindowTimeout() {
+    // TODO Auto-generated method stub
+    // The window states are updated here
+    // And the correpsonding deltaChanges is also calculated here.
+  }
+
+  @Override
+  public void window(MessageCollector collector, TaskCoordinator coordinator) 
throws Exception {
+    SqlMessageCollector sqlCollector = (SqlMessageCollector) collector;
+    updateWindowTimeout();
+    processWindowChanges(sqlCollector);
+    sqlCollector.timeout(this.spec.getOutputNames());
+  }
+
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    // TODO Auto-generated method stub
+    if (this.relation == null) {
+      this.relation = (Relation) 
context.getStore(this.spec.getOutputName().toString());
+      Relation wndStates = (Relation) 
context.getStore(this.spec.getWndStatesName());
+      this.windowStates = new ArrayList<WindowState>();
+      for (KeyValueIterator<Object, Tuple> iter = wndStates.all(); 
iter.hasNext();) {
+        this.windowStates.add((WindowState) 
iter.next().getValue().getMessage());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
new file mode 100644
index 0000000..e2ae3aa
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.window;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
+
+
+/**
+ * This class implements the specification class for the build-in 
<code>BoundedTimeWindow</code> operator
+ */
+public class WindowSpec extends SimpleOperatorSpec implements OperatorSpec {
+
+  /**
+   * The window size in seconds
+   */
+  private final int wndSizeSec;
+
+  /**
+   * Default ctor of the <code>WindowSpec</code> object
+   *
+   * @param id The identifier of the operator
+   * @param input The input stream entity
+   * @param output The output relation entity
+   * @param lengthSec The window size in seconds
+   */
+  public WindowSpec(String id, EntityName input, EntityName output, int 
lengthSec) {
+    super(id, input, output);
+    this.wndSizeSec = lengthSec;
+  }
+
+  /**
+   * Method to get the window state relation name
+   *
+   * @return The window state relation name
+   */
+  public String getWndStatesName() {
+    return this.getId() + "-wnd-state";
+  }
+
+  /**
+   * Method to get the window size in seconds
+   *
+   * @return The window size in seconds
+   */
+  public int getWndSizeSec() {
+    return this.wndSizeSec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
new file mode 100644
index 0000000..48547f0
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.window;
+
+public class WindowState {
+  public String startOffset = null;
+  public String endOffset = null;
+  public boolean isClosed = false;
+
+  public void open(String offset) {
+    this.isClosed = false;
+    this.startOffset = offset;
+  }
+
+  public void close(String offset) {
+    this.endOffset = offset;
+    this.isClosed = true;
+  }
+
+  public void advanceTo(String offset) {
+    this.endOffset = offset;
+  }
+
+  public boolean isClosed() {
+    return this.isClosed;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/router/SimpleRouter.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/router/SimpleRouter.java 
b/samza-sql/src/main/java/org/apache/samza/sql/router/SimpleRouter.java
new file mode 100644
index 0000000..c6fc673
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/router/SimpleRouter.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.router;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.operators.Operator;
+import org.apache.samza.sql.api.operators.RelationOperator;
+import org.apache.samza.sql.api.operators.TupleOperator;
+import org.apache.samza.sql.api.router.OperatorRouter;
+
+
+/**
+ * Example implementation of <code>OperatorRouter</code>
+ *
+ */
+public class SimpleRouter implements OperatorRouter {
+  /**
+   * List of operators added to the <code>OperatorRouter</code>
+   */
+  private List<Operator> operators = new ArrayList<Operator>();
+
+  @SuppressWarnings("rawtypes")
+  /**
+   * Map of <code>EntityName</code> to the list of operators associated with it
+   */
+  private Map<EntityName, List> nextOps = new HashMap<EntityName, List>();
+
+  /**
+   * List of <code>EntityName</code> as system inputs
+   */
+  private List<EntityName> inputEntities = new ArrayList<EntityName>();
+
+  @SuppressWarnings("unchecked")
+  private void addOperator(EntityName output, Operator nextOp) {
+    if (nextOps.get(output) == null) {
+      nextOps.put(output, new ArrayList<Operator>());
+    }
+    nextOps.get(output).add(nextOp);
+    operators.add(nextOp);
+
+  }
+
+  @Override
+  public Iterator<Operator> iterator() {
+    return operators.iterator();
+  }
+
+  @Override
+  public void addTupleOperator(EntityName outputStream, TupleOperator nextOp) 
throws Exception {
+    if (!outputStream.isStream()) {
+      throw new IllegalArgumentException("Can't attach an TupleOperator " + 
nextOp.getSpec().getId()
+          + " to a non-stream entity " + outputStream);
+    }
+    addOperator(outputStream, nextOp);
+  }
+
+  @Override
+  public void addRelationOperator(EntityName outputRelation, RelationOperator 
nextOp) throws Exception {
+    if (!outputRelation.isRelation()) {
+      throw new IllegalArgumentException("Can't attach an RelationOperator " + 
nextOp.getSpec().getId()
+          + " to a non-relation entity " + outputRelation);
+    }
+    addOperator(outputRelation, nextOp);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public List<RelationOperator> getRelationOperators(EntityName 
outputRelation) {
+    if (!outputRelation.isRelation()) {
+      throw new IllegalArgumentException("Can't get RelationOperators for a 
non-relation output: " + outputRelation);
+    }
+    return nextOps.get(outputRelation);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public List<TupleOperator> getTupleOperators(EntityName outputStream) {
+    if (!outputStream.isStream()) {
+      throw new IllegalArgumentException("Can't get TupleOperators for a 
non-stream output: " + outputStream);
+    }
+    return nextOps.get(outputStream);
+  }
+
+  @Override
+  public boolean hasNextOperators(EntityName output) {
+    return nextOps.get(output) != null && !nextOps.get(output).isEmpty();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public List<Operator> getNextOperators(EntityName output) {
+    return nextOps.get(output);
+  }
+
+  @Override
+  public void addSystemInput(EntityName input) {
+    if (!nextOps.containsKey(input) || nextOps.get(input).isEmpty()) {
+      throw new IllegalStateException("Can't set a system input w/o any next 
operators. input:" + input);
+    }
+    if (!inputEntities.contains(input)) {
+      inputEntities.add(input);
+    }
+  }
+
+  @Override
+  public List<EntityName> getSystemInputs() {
+    return this.inputEntities;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java
 
b/samza-sql/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java
new file mode 100644
index 0000000..1e5310f
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task.sql;
+
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.Operator;
+import org.apache.samza.sql.api.operators.RelationOperator;
+import org.apache.samza.sql.api.operators.TupleOperator;
+import org.apache.samza.sql.api.router.OperatorRouter;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Example implementation of a <code>SqlMessageCollector</code> that uses 
<code>OperatorRouter</code>
+ *
+ */
+public class OperatorMessageCollector implements SqlMessageCollector {
+
+  private final MessageCollector collector;
+  private final TaskCoordinator coordinator;
+  private final OperatorRouter rteCntx;
+
+  public OperatorMessageCollector(MessageCollector collector, TaskCoordinator 
coordinator, OperatorRouter rteCntx) {
+    this.collector = collector;
+    this.coordinator = coordinator;
+    this.rteCntx = rteCntx;
+  }
+
+  @Override
+  public void send(Relation deltaRelation) throws Exception {
+    for (RelationOperator op : 
this.rteCntx.getRelationOperators(deltaRelation.getName())) {
+      op.process(deltaRelation, this);
+    }
+  }
+
+  @Override
+  public void send(Tuple tuple) throws Exception {
+    for (TupleOperator op : 
this.rteCntx.getTupleOperators(tuple.getStreamName())) {
+      op.process(tuple, this);
+    }
+  }
+
+  @Override
+  public void timeout(List<EntityName> outputs) throws Exception {
+    for (EntityName output : outputs) {
+      for (Operator op : this.rteCntx.getNextOperators(output)) {
+        op.window(this, this.coordinator);
+      }
+    }
+  }
+
+  @Override
+  public void send(OutgoingMessageEnvelope envelope) {
+    this.collector.send(envelope);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java 
b/samza-sql/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java
new file mode 100644
index 0000000..b98e2d7
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task.sql;
+
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.task.MessageCollector;
+
+
+/**
+ * This class defines the interface class to be used by the operators to send 
their output via runtime system resources,
+ * s.t. the output system streams, the system storage, or 
<code>OperatorRouter</code>.
+ *
+ */
+public interface SqlMessageCollector extends MessageCollector {
+
+  /**
+   * This method allows the current operator send its relation output to next
+   *
+   * @param deltaRelation The delta <code>Relation</code> output generated by 
the current operator
+   * @throws Exception Throws exception if failed
+   */
+  void send(Relation deltaRelation) throws Exception;
+
+  /**
+   * This method allows the current operator send its tuple output to next
+   *
+   * @param tuple The <code>Tuple</code> object generated by the current 
operator
+   * @throws Exception Throws exception if failed
+   */
+  void send(Tuple tuple) throws Exception;
+
+  /**
+   * This method allows the current operator triggers timeout actions via the 
<code>SqlMessageCollector</code>.
+   *
+   * <p>This method sets timeout events to the corresponding 
<code>outputEntities</code> s.t. the next operators
+   * attached to those entities will be notified of the timeout.
+   *
+   * @param outputEntities The list of output entities via which the timeout 
event needs to be sent to
+   * @throws Exception Throws exception if failed
+   */
+  void timeout(List<EntityName> outputEntities) throws Exception;
+
+}

Reply via email to