SAMZA-482; create samza-sql module, and add a basic set of non-functional operators into it
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d4861df4 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d4861df4 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d4861df4 Branch: refs/heads/samza-sql Commit: d4861df4d4cd37f2d5ddc2db5e3158426de4139c Parents: 6743df3 Author: Yi Pan <[email protected]> Authored: Thu Feb 12 14:27:29 2015 -0800 Committer: Chris Riccomini <[email protected]> Committed: Thu Feb 12 14:27:29 2015 -0800 ---------------------------------------------------------------------- build.gradle | 20 +++ gradle/dependency-versions.gradle | 1 + samza-sql/README | 1 + .../apache/samza/sql/api/data/EntityName.java | 141 ++++++++++++++++ .../org/apache/samza/sql/api/data/Relation.java | 47 ++++++ .../org/apache/samza/sql/api/data/Tuple.java | 58 +++++++ .../samza/sql/api/operators/Operator.java | 43 +++++ .../sql/api/operators/RelationOperator.java | 51 ++++++ .../sql/api/operators/SqlOperatorFactory.java | 51 ++++++ .../samza/sql/api/operators/TupleOperator.java | 47 ++++++ .../sql/api/operators/spec/OperatorSpec.java | 64 ++++++++ .../samza/sql/api/router/OperatorRouter.java | 126 +++++++++++++++ .../samza/sql/data/IncomingMessageTuple.java | 74 +++++++++ .../sql/operators/factory/SimpleOperator.java | 50 ++++++ .../factory/SimpleOperatorFactoryImpl.java | 63 ++++++++ .../operators/factory/SimpleOperatorSpec.java | 106 +++++++++++++ .../sql/operators/partition/PartitionOp.java | 90 +++++++++++ .../sql/operators/partition/PartitionSpec.java | 91 +++++++++++ .../samza/sql/operators/relation/Join.java | 139 ++++++++++++++++ .../samza/sql/operators/relation/JoinSpec.java | 60 +++++++ .../sql/operators/stream/InsertStream.java | 98 ++++++++++++ .../sql/operators/stream/InsertStreamSpec.java | 42 +++++ .../sql/operators/window/BoundedTimeWindow.java | 141 ++++++++++++++++ .../samza/sql/operators/window/WindowSpec.java | 67 ++++++++ .../samza/sql/operators/window/WindowState.java | 44 +++++ .../apache/samza/sql/router/SimpleRouter.java | 133 ++++++++++++++++ .../task/sql/OperatorMessageCollector.java | 80 ++++++++++ .../samza/task/sql/SqlMessageCollector.java | 64 ++++++++ .../samza/task/sql/StoreMessageCollector.java | 80 ++++++++++ .../samza/task/sql/RandomOperatorTask.java | 151 ++++++++++++++++++ .../apache/samza/task/sql/StreamSqlTask.java | 159 +++++++++++++++++++ settings.gradle | 3 +- 32 files changed, 2384 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index b803276..e6b10fc 100644 --- a/build.gradle +++ b/build.gradle @@ -246,6 +246,26 @@ project(":samza-yarn_$scalaVersion") { jar.dependsOn("lesscss") } +project(":samza-sql_$scalaVersion") { + apply plugin: 'java' + + configurations { + // Remove transitive dependencies from Zookeeper that we don't want. + compile.exclude group: 'javax.jms', module: 'jms' + compile.exclude group: 'com.sun.jdmk', module: 'jmxtools' + compile.exclude group: 'com.sun.jmx', module: 'jmxri' + } + + dependencies { + compile project(':samza-api') + compile project(":samza-core_$scalaVersion") + compile project(":samza-kv_$scalaVersion") + compile "commons-collections:commons-collections:$commonsCollectionVersion" + testCompile "junit:junit:$junitVersion" + testCompile "org.mockito:mockito-all:$mockitoVersion" + } +} + project(":samza-shell") { apply plugin: 'java' http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 84be50b..6f815b2 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -34,4 +34,5 @@ log4jVersion = "1.2.17" guavaVersion = "17.0" commonsCodecVersion = "1.9" + commonsCollectionVersion = "3.2.1" } http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/README ---------------------------------------------------------------------- diff --git a/samza-sql/README b/samza-sql/README new file mode 100644 index 0000000..65b7558 --- /dev/null +++ b/samza-sql/README @@ -0,0 +1 @@ +samza-sql is an experimental module that is under development (SAMZA-390). \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/data/EntityName.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/EntityName.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/EntityName.java new file mode 100644 index 0000000..127a677 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/EntityName.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.api.data; + +import java.util.HashMap; +import java.util.Map; + + +/** + * This class defines the name scheme for the collective data entities in Samza Stream SQL, i.e. relations and streams. + */ +public class EntityName { + /** + * <code>EntityType</code> defines the types of the entity names + * + */ + private enum EntityType { + RELATION, + STREAM + }; + + /** + * Type of the entity name + */ + private final EntityType type; + + /** + * Formatted name of the entity. + * + * <p>This formatted name of the entity should be unique identifier for the corresponding relation/stream in the system. + * e.g. for a Kafka system stream named "mystream", the formatted name should be "kafka:mystream". + */ + private final String name; + + //TODO: we may want to replace the map with Guava cache to allow GC + /** + * Static map of already allocated relation names + */ + private static Map<String, EntityName> relations = new HashMap<String, EntityName>(); + + /** + * Static map of already allocated stream names + */ + private static Map<String, EntityName> streams = new HashMap<String, EntityName>(); + + /** + * Private ctor to create entity names + * + * @param type Type of the entity name + * @param name Formatted name of the entity + */ + private EntityName(EntityType type, String name) { + this.type = type; + this.name = name; + } + + @Override + public String toString() { + return String.format("%s:%s", this.type, this.name); + } + + @Override + public boolean equals(Object other) { + if (other instanceof EntityName) { + EntityName otherEntity = (EntityName) other; + return this.type.equals(otherEntity.type) && this.name.equals(otherEntity.name); + } + return false; + } + + /** + * Check to see whether this entity name is for a relation + * + * @return true if the entity type is <code>EntityType.RELATION</code>; false otherwise + */ + public boolean isRelation() { + return this.type.equals(EntityType.RELATION); + } + + /** + * Check to see whether this entity name is for a stream + * + * @return true if the entity type is <code>EntityType.STREAM</code>; false otherwise + */ + public boolean isStream() { + return this.type.equals(EntityType.STREAM); + } + + /** + * Get the formatted entity name + * + * @return The formatted entity name + */ + public String getName() { + return this.name; + } + + /** + * Static method to get the instance of <code>EntityName</code> with type <code>EntityType.RELATION</code> + * + * @param name The formatted entity name of the relation + * @return A <code>EntityName</code> for a relation + */ + public static EntityName getRelationName(String name) { + if (relations.get(name) == null) { + relations.put(name, new EntityName(EntityType.RELATION, name)); + } + return relations.get(name); + } + + /** + * Static method to get the instance of <code>EntityName</code> with type <code>EntityType.STREAM</code> + * + * @param name The formatted entity name of the stream + * @return A <code>EntityName</code> for a stream + */ + public static EntityName getStreamName(String name) { + if (streams.get(name) == null) { + streams.put(name, new EntityName(EntityType.STREAM, name)); + } + return streams.get(name); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/data/Relation.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Relation.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Relation.java new file mode 100644 index 0000000..90b8026 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Relation.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.api.data; + +import org.apache.samza.storage.kv.KeyValueStore; + + +/** + * This class defines the general interface of <code>Relation</code>, which is defined as a map of <code>Tuple</code>. + * + * <p>The interface is defined as an extension to <code>KeyValueStore<Object, Tuple></code>. + * + */ + +public interface Relation extends KeyValueStore<Object, Tuple> { + + /** + * Get the primary key field name for this table + * + * @return The name of the primary key field + */ + String getPrimaryKey(); + + /** + * Get the name of the relation created by CREATE TABLE + * + * @return The relation name + */ + EntityName getName(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java new file mode 100644 index 0000000..0c21a53 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.api.data; + +/** + * This class defines the generic interface of <code>Tuple</code>, which is a entry from the incoming stream, or one row in a <code>Relation</code>. + * + * <p>The <code>Tuple</code> models the basic operatible unit in streaming SQL processes in Samza. + * + */ +public interface Tuple { + + /** + * Access method to get the corresponding message body in the tuple + * + * @return Message object in the tuple + */ + Object getMessage(); + + /** + * Method to indicate whether the tuple is a delete tuple or an insert tuple + * + * @return A boolean value indicates whether the current tuple is a delete or insert message + */ + boolean isDelete(); + + /** + * Access method to the key of the tuple + * + * @return The <code>key</code> of the tuple + */ + Object getKey(); + + /** + * Get the stream name of the tuple. Note this stream name should be unique in the system. + * + * @return The stream name which this tuple belongs to + */ + EntityName getStreamName(); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/operators/Operator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/Operator.java b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/Operator.java new file mode 100644 index 0000000..0169f2d --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/Operator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.api.operators; + +import org.apache.samza.sql.api.operators.spec.OperatorSpec; +import org.apache.samza.task.InitableTask; +import org.apache.samza.task.WindowableTask; + + +/** + * This class defines the common interface for operator classes, no matter what input data are. + * + * <p> It extends the <code>InitableTask</code> and <code>WindowableTask</code> to reuse the interface methods + * <code>init</code> and <code>window</code> for initialization and timeout operations + * + */ +public interface Operator extends InitableTask, WindowableTask { + + /** + * Method to the specification of this <code>Operator</code> + * + * @return The <code>OperatorSpec</code> object that defines the configuration/parameters of the operator + */ + OperatorSpec getSpec(); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java new file mode 100644 index 0000000..faa0a32 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.api.operators; + +import org.apache.samza.sql.api.data.Relation; +import org.apache.samza.task.sql.SqlMessageCollector; + + +/** + * This class defines the interface <code>RelationOperator</code>. + * + * <p>All operators implementing <code>RelationOperator</code> will take a <code>Relation</code> object as input. + * The SQL operators that need to implement this interface include: + * <ul> + * <li>All relation algebra operators, such as: join, select, where, group-by, having, limit, order-by, etc. + * <li>All relation-to-stream operators, which converts a relation to a stream + * </ul> + * + */ +public interface RelationOperator extends Operator { + + /** + * Method to perform a relational algebra on a set of relations, or a relation-to-stream function + * + * <p> The actual implementation of relational logic is performed by the implementation of this method. + * The <code>collector</code> object is used by the operator to send their output to + * + * @param deltaRelation The changed rows in the input relation, including the inserts/deletes/updates + * @param collector The <code>SqlMessageCollector</code> object that accepts outputs from the operator + * @throws Exception Throws exception if failed + */ + void process(Relation deltaRelation, SqlMessageCollector collector) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java new file mode 100644 index 0000000..67671b9 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.api.operators; + +import org.apache.samza.sql.api.operators.spec.OperatorSpec; + + +/** + * This class defines the interface of SQL operator factory, which creates the following operators: + * <ul> + * <li><code>RelationOperator</code> that takes <code>Relation</code> as input variables + * <li><code>TupleOperator</code> that takes <code>Tuple</code> as input variables + * </ul> + * + */ +public interface SqlOperatorFactory { + + /** + * Interface method to create/get the <code>RelationOperator</code> object + * + * @param spec The specification of the <code>RelationOperator</code> object + * @return The relation operator object + */ + RelationOperator getRelationOperator(OperatorSpec spec); + + /** + * Interface method to create/get the <code>TupleOperator</code> object + * + * @param spec The specification of the <code>TupleOperator</code> object + * @return The tuple operator object + */ + TupleOperator getTupleOperator(OperatorSpec spec); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java new file mode 100644 index 0000000..ac4654e --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.api.operators; + +import org.apache.samza.sql.api.data.Tuple; +import org.apache.samza.task.sql.SqlMessageCollector; + + +/** + * This class defines the interface class that processes incoming tuples from input stream(s). + * + * <p>All operators implementing <code>TupleOperator</code> will take a <code>Tuple</code> object as input. + * The SQL operators that need to implement this interface include: + * <ul> + * <li>All stream-to-relation operators, such as: window operators. + * <li>All stream-to-stream operators, such as: re-partition, union of two streams + * </ul> + * + */ +public interface TupleOperator extends Operator { + /** + * Interface method to process on an input tuple. + * + * @param tuple The input tuple, which has the incoming message from a stream + * @param collector The <code>SqlMessageCollector</code> object that accepts outputs from the operator + * @throws Exception Throws exception if failed + */ + void process(Tuple tuple, SqlMessageCollector collector) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java new file mode 100644 index 0000000..96385e2 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.api.operators.spec; + +import java.util.List; + +import org.apache.samza.sql.api.data.EntityName; + + +/** + * This class defines a generic specification interface class for all operators. + * + * <p>The purpose of this class is to encapsulate all the details of configuration/parameters of a specific implementation of an operator. + * + * <p>The generic methods for an operator specification is to provide methods to get the unique ID, the list of entity names (i.e. stream name + * in <code>Tuple</code> or <code>Relation</code> name) of input variables , and the list of entity names of the output variables. + * + */ +public interface OperatorSpec { + /** + * Interface method that returns the unique ID of the operator in a task + * + * @return The unique ID of the <code>Operator</code> object + */ + String getId(); + + /** + * Access method to the list of entity names of input variables. + * + * <p>The input entity names are either stream names if the operator is a <code>TupleOperator</code>; + * or <code>Relation</code> names if the operator is a <code>RelationOperator</code> + * + * @return A list of entity names of the inputs + */ + List<EntityName> getInputNames(); + + /** + * Access method to the list of entity name of the output variable + * + * <p>The output entity name is either a stream name if the operator generates tuples as an output stream; + * or <code>Relation</code> names if the operator generates a <code>Relation</code> as output. + * + * @return The entity name of the output + * + */ + List<EntityName> getOutputNames(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java b/samza-sql/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java new file mode 100644 index 0000000..2455a62 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.api.router; + +import java.util.Iterator; +import java.util.List; + +import org.apache.samza.sql.api.data.EntityName; +import org.apache.samza.sql.api.operators.Operator; +import org.apache.samza.sql.api.operators.RelationOperator; +import org.apache.samza.sql.api.operators.TupleOperator; + + +/** + * This interface class defines interface methods to connect operators together. + * + * <p>The <code>OperatorRouter</code> allows the user to attach operators to a relation or a stream entity, + * if the corresponding relation/stream is included as inputs to the operator. Each operator then executes its own logic + * and determines which relation/stream to emit the output to. Through the <code>OperatorRouter</code>, the next + * operators attached to the corresponding output entities (i.e. relations/streams) can then be invoked to continue the + * stream process task. + * + * <p>The <code>OperatorRouter</code> also allows the user to set the system input entities (i.e. relations/streams) + * that are fed into the operators by the system outside the <code>OperatorRouter</code>, not generated by some + * operators in the <code>OperatorRouter</code>. + * + * <p>The methods included in this interface class allow a user to + * <ul> + * <li>i) add operators to an <code>EntityName</code> + * <li>ii) get the next operators attached to an <code>EntityName</code> + * <li>iii) add and get the system input <code>EntityName</code>s + * <li>iv) iterate through each and every operator connected via <code>OperatorRouter</code> + * </ul> + * + */ +public interface OperatorRouter { + + /** + * This method adds a <code>TupleOperator</code> as one of the input operators. + * + * @param stream The output stream entity name + * @param nextOp The <code>TupleOperator</code> that takes the tuples in the <code>stream</code> as an input. + * @throws Exception Throws exception if failed + */ + void addTupleOperator(EntityName stream, TupleOperator nextOp) throws Exception; + + /** + * This method adds a <code>RelationOperator</code> as one of the input operators + + * @param relation The input relation entity name + * @param nextOp The <code>RelationOperator</code> that takes the <code>relation</code> as an input + * @throws Exception Throws exception if failed + */ + void addRelationOperator(EntityName relation, RelationOperator nextOp) throws Exception; + + /** + * This method gets the list of <code>RelationOperator</code>s attached to the <code>relation</code> + * + * @param relation The identifier of the relation entity + * @return The list of <code>RelationOperator</code> taking <code>relation</code> as an input variable + */ + List<RelationOperator> getRelationOperators(EntityName relation); + + /** + * This method gets the list of <code>TupleOperator</code>s attached to the <code>stream</code> + * + * @param stream The identifier of the stream entity + * @return The list of <code>TupleOperator</code> taking <code>stream</code> as an input variable + */ + List<TupleOperator> getTupleOperators(EntityName stream); + + /** + * This method gets the list of <code>Operator</code>s attached to an output entity (of any type) + * + * @param output The identifier of the output entity + * @return The list of <code>Operator</code> taking <code>output</code> as input variables + */ + List<Operator> getNextOperators(EntityName output); + + /** + * This method provides an iterator to go through all operators connected via <code>OperatorRouter</code> + * + * @return An <code>Iterator</code> for all operators connected via <code>OperatorRouter</code> + */ + Iterator<Operator> iterator(); + + /** + * This method checks to see whether there is any <code>Operator</code> attached to the entity <code>output</code> + * + * @param output The output entity name + * @return True if there is some operator attached to the <code>output</code>; false otherwise + */ + boolean hasNextOperators(EntityName output); + + /** + * This method adds an entity as the system input + * + * @param input The entity name for the system input + */ + void addSystemInput(EntityName input); + + /** + * This method returns the list of entities as system inputs + * + * @return The list of <code>EntityName</code>s as system inputs + */ + List<EntityName> getSystemInputs(); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java b/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java new file mode 100644 index 0000000..a8a55e2 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.sql.data; + +import org.apache.samza.sql.api.data.EntityName; +import org.apache.samza.sql.api.data.Tuple; +import org.apache.samza.system.IncomingMessageEnvelope; + + +/** + * This class implements a <code>Tuple</code> class that encapsulates <code>IncomingMessageEnvelope</code> from the system + * + */ +public class IncomingMessageTuple implements Tuple { + /** + * Incoming message envelope + */ + private final IncomingMessageEnvelope imsg; + + /** + * The entity name for the incoming system stream + */ + private final EntityName strmEntity; + + /** + * Ctor to create a <code>IncomingMessageTuple</code> from <code>IncomingMessageEnvelope</code> + * + * @param imsg The incoming system message + */ + public IncomingMessageTuple(IncomingMessageEnvelope imsg) { + this.imsg = imsg; + this.strmEntity = + EntityName.getStreamName(String.format("%s:%s", imsg.getSystemStreamPartition().getSystem(), imsg + .getSystemStreamPartition().getStream())); + } + + // TODO: the return type should be changed to the generic data type + @Override + public Object getMessage() { + return this.imsg.getMessage(); + } + + @Override + public boolean isDelete() { + return false; + } + + @Override + public Object getKey() { + return imsg.getKey(); + } + + @Override + public EntityName getStreamName() { + return this.strmEntity; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java new file mode 100644 index 0000000..c634159 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.operators.factory; + +import org.apache.samza.sql.api.operators.Operator; +import org.apache.samza.sql.api.operators.spec.OperatorSpec; + + +/** + * An abstract class that encapsulate the basic information and methods that all operator classes should implement. + * + */ +public abstract class SimpleOperator implements Operator { + /** + * The specification of this operator + */ + private final OperatorSpec spec; + + /** + * Ctor of <code>SimpleOperator</code> class + * + * @param spec The specification of this operator + */ + public SimpleOperator(OperatorSpec spec) { + this.spec = spec; + } + + @Override + public OperatorSpec getSpec() { + return this.spec; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java new file mode 100644 index 0000000..916b166 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.operators.factory; + +import org.apache.samza.sql.api.operators.RelationOperator; +import org.apache.samza.sql.api.operators.SqlOperatorFactory; +import org.apache.samza.sql.api.operators.TupleOperator; +import org.apache.samza.sql.api.operators.spec.OperatorSpec; +import org.apache.samza.sql.operators.partition.PartitionOp; +import org.apache.samza.sql.operators.partition.PartitionSpec; +import org.apache.samza.sql.operators.relation.Join; +import org.apache.samza.sql.operators.relation.JoinSpec; +import org.apache.samza.sql.operators.stream.InsertStream; +import org.apache.samza.sql.operators.stream.InsertStreamSpec; +import org.apache.samza.sql.operators.window.BoundedTimeWindow; +import org.apache.samza.sql.operators.window.WindowSpec; + + +/** + * This simple factory class provides method to create the build-in operators per operator specification. + * It can be extended when the build-in operators expand. + * + */ +public class SimpleOperatorFactoryImpl implements SqlOperatorFactory { + + @Override + public RelationOperator getRelationOperator(OperatorSpec spec) { + if (spec instanceof JoinSpec) { + return new Join((JoinSpec) spec); + } else if (spec instanceof InsertStreamSpec) { + return new InsertStream((InsertStreamSpec) spec); + } + throw new UnsupportedOperationException("Unsupported operator specified: " + spec.getClass().getCanonicalName()); + } + + @Override + public TupleOperator getTupleOperator(OperatorSpec spec) { + if (spec instanceof WindowSpec) { + return new BoundedTimeWindow((WindowSpec) spec); + } else if (spec instanceof PartitionSpec) { + return new PartitionOp((PartitionSpec) spec); + } + throw new UnsupportedOperationException("Unsupported operator specified" + spec.getClass().getCanonicalName()); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java new file mode 100644 index 0000000..93d4ebb --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.sql.operators.factory; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.samza.sql.api.data.EntityName; +import org.apache.samza.sql.api.operators.spec.OperatorSpec; + + +/** + * An abstract class that encapsulate the basic information and methods that all specification of operators should implement. + * + */ +public abstract class SimpleOperatorSpec implements OperatorSpec { + /** + * The identifier of the corresponding operator + */ + private final String id; + + /** + * The list of input entity names of the corresponding operator + */ + private final List<EntityName> inputs = new ArrayList<EntityName>(); + + /** + * The list of output entity names of the corresponding operator + */ + private final List<EntityName> outputs = new ArrayList<EntityName>(); + + /** + * Ctor of the <code>SimpleOperatorSpec</code> for simple <code>Operator</code>s w/ one input and one output + * + * @param id Unique identifier of the <code>Operator</code> object + * @param input The only input entity + * @param output The only output entity + */ + public SimpleOperatorSpec(String id, EntityName input, EntityName output) { + this.id = id; + this.inputs.add(input); + this.outputs.add(output); + } + + /** + * Ctor of <code>SimpleOperatorSpec</code> with general format: m inputs and n outputs + * + * @param id Unique identifier of the <code>Operator</code> object + * @param inputs The list of input entities + * @param output The list of output entities + */ + public SimpleOperatorSpec(String id, List<EntityName> inputs, EntityName output) { + this.id = id; + this.inputs.addAll(inputs); + this.outputs.add(output); + } + + @Override + public String getId() { + return this.id; + } + + @Override + public List<EntityName> getInputNames() { + return this.inputs; + } + + @Override + public List<EntityName> getOutputNames() { + return this.outputs; + } + + /** + * Method to get the first output entity + * + * @return The first output entity name + */ + public EntityName getOutputName() { + return this.outputs.get(0); + } + + /** + * Method to get the first input entity + * + * @return The first input entity name + */ + public EntityName getInputName() { + return this.inputs.get(0); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java new file mode 100644 index 0000000..7921d4f --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.operators.partition; + +import org.apache.samza.config.Config; +import org.apache.samza.sql.api.data.Tuple; +import org.apache.samza.sql.api.operators.TupleOperator; +import org.apache.samza.sql.operators.factory.SimpleOperator; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.task.sql.SqlMessageCollector; + + +/** + * This is an example build-in operator that performs a simple stream re-partition operation. + * + */ +public final class PartitionOp extends SimpleOperator implements TupleOperator { + + /** + * The specification of this <code>PartitionOp</code> + * + */ + private final PartitionSpec spec; + + /** + * Ctor that takes the <code>PartitionSpec</code> object as input. + * + * @param spec The <code>PartitionSpec</code> object + */ + public PartitionOp(PartitionSpec spec) { + super(spec); + this.spec = spec; + } + + /** + * A simplified constructor that allow users to randomly create <code>PartitionOp</code> + * + * @param id The identifier of this operator + * @param input The input stream name of this operator + * @param system The output system name of this operator + * @param output The output stream name of this operator + * @param parKey The partition key used for the output stream + * @param parNum The number of partitions used for the output stream + */ + public PartitionOp(String id, String input, String system, String output, String parKey, int parNum) { + super(new PartitionSpec(id, input, new SystemStream(system, output), parKey, parNum)); + this.spec = (PartitionSpec) super.getSpec(); + } + + @Override + public void init(Config config, TaskContext context) throws Exception { + // TODO Auto-generated method stub + // No need to initialize store since all inputs are immediately send out + } + + @Override + public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { + // TODO Auto-generated method stub + // NOOP or flush + } + + @Override + public void process(Tuple tuple, SqlMessageCollector collector) throws Exception { + collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getKey(), + null /* TODO: when merge with Schema API changes, use: tuple + .getMessage().getFieldData(PartitionOp.this.spec.getParKey()) */, tuple.getMessage())); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java new file mode 100644 index 0000000..29d1784 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.operators.partition; + +import org.apache.samza.sql.api.data.EntityName; +import org.apache.samza.sql.api.operators.spec.OperatorSpec; +import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; +import org.apache.samza.system.SystemStream; + + +/** + * This class defines the specification class of <code>PartitionOp</code> operator + * + */ +public class PartitionSpec extends SimpleOperatorSpec implements OperatorSpec { + + /** + * The partition key name + */ + private final String parKey; + + /** + * The number of partitions + */ + private final int parNum; + + /** + * The <code>SystemStream</code> to send the partition output to + */ + private final SystemStream sysStream; + + /** + * Ctor to create the <code>PartitionSpec</code> + * + * @param id The ID of the <code>PartitionOp</code> + * @param input The input stream name + * @param output The output <code>SystemStream</code> object + * @param parKey The name of the partition key + * @param parNum The number of partitions + */ + public PartitionSpec(String id, String input, SystemStream output, String parKey, int parNum) { + super(id, EntityName.getStreamName(input), EntityName.getStreamName(output.getSystem() + ":" + output.getStream())); + this.parKey = parKey; + this.parNum = parNum; + this.sysStream = output; + } + + /** + * Method to get the partition key name + * + * @return The partition key name + */ + public String getParKey() { + return this.parKey; + } + + /** + * Method to get the number of partitions + * + * @return The number of partitions + */ + public int getParNum() { + return this.parNum; + } + + /** + * Method to get the output <code>SystemStream</code> + * + * @return The output system stream object + */ + public SystemStream getSystemStream() { + return this.sysStream; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/Join.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/Join.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/Join.java new file mode 100644 index 0000000..a8a6eaf --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/Join.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.operators.relation; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.samza.config.Config; +import org.apache.samza.sql.api.data.EntityName; +import org.apache.samza.sql.api.data.Relation; +import org.apache.samza.sql.api.operators.RelationOperator; +import org.apache.samza.sql.operators.factory.SimpleOperator; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.task.sql.SqlMessageCollector; + + +/** + * This class defines an example build-in operator for a join operator between two relations. + * + */ +public class Join extends SimpleOperator implements RelationOperator { + + private final JoinSpec spec; + + /** + * The input relations + * + */ + private List<Relation> inputs = null; + + /** + * The output relation + */ + private Relation output = null; + + /** + * Ctor that creates <code>Join</code> operator based on the specification. + * + * @param spec The <code>JoinSpec</code> object that specifies the join operator + */ + public Join(JoinSpec spec) { + super(spec); + this.spec = spec; + } + + /** + * An alternative ctor that allows users to create a join operator randomly. + * + * @param id The identifier of the join operator + * @param joinIns The list of input relation names of the join + * @param joinOut The output relation name of the join + * @param joinKeys The list of keys used in the join. Each entry in the <code>joinKeys</code> is the key name used in one of the input relations. + * The order of the <code>joinKeys</code> MUST be the same as their corresponding relation names in <code>joinIns</code> + */ + @SuppressWarnings("serial") + public Join(final String id, final List<String> joinIns, final String joinOut, final List<String> joinKeys) { + super(new JoinSpec(id, new ArrayList<EntityName>() { + { + for (String name : joinIns) { + add(EntityName.getRelationName(name)); + } + } + }, EntityName.getRelationName(joinOut), joinKeys)); + this.spec = (JoinSpec) this.getSpec(); + } + + private boolean hasPendingChanges() { + return getPendingChanges() != null; + } + + private Relation getPendingChanges() { + // TODO Auto-generated method stub + // return any pending changes that have not been processed yet + return null; + } + + private Relation getOutputChanges() { + // TODO Auto-generated method stub + return null; + } + + private boolean hasOutputChanges() { + // TODO Auto-generated method stub + return getOutputChanges() != null; + } + + private void join(Relation deltaRelation) { + // TODO Auto-generated method stub + // implement the join logic + // 1. calculate the delta changes in <code>output</code> + // 2. check output condition to see whether the current input should trigger an output + // 3. set the output changes and pending changes + } + + @Override + public void init(Config config, TaskContext context) throws Exception { + for (EntityName relation : this.spec.getInputNames()) { + inputs.add((Relation) context.getStore(relation.toString())); + } + this.output = (Relation) context.getStore(this.spec.getOutputName().toString()); + } + + @Override + public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { + SqlMessageCollector sqlCollector = (SqlMessageCollector) collector; + if (hasPendingChanges()) { + sqlCollector.send(getPendingChanges()); + } + sqlCollector.timeout(this.spec.getOutputNames()); + } + + @Override + public void process(Relation deltaRelation, SqlMessageCollector collector) throws Exception { + // calculate join based on the input <code>deltaRelation</code> + join(deltaRelation); + if (hasOutputChanges()) { + collector.send(getOutputChanges()); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java new file mode 100644 index 0000000..ba8bfb5 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.operators.relation; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.samza.sql.api.data.EntityName; +import org.apache.samza.sql.api.operators.spec.OperatorSpec; +import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; + + +/** + * This class implements specification class for the build-in <code>Join</code> operator + */ +public class JoinSpec extends SimpleOperatorSpec implements OperatorSpec { + /** + * Join keys defined for each input relation + */ + private final List<String> joinKeys = new ArrayList<String>(); + + /** + * Default ctor for the <code>JoinSpec</code> + * + * @param id Unique ID of the <code>Join</code> object + * @param joinIns The list of input relations + * @param joinOut The output relation + * @param joinKeys The list of join keys in input relations + */ + public JoinSpec(String id, List<EntityName> joinIns, EntityName joinOut, List<String> joinKeys) { + super(id, joinIns, joinOut); + this.joinKeys.addAll(joinKeys); + } + + /** + * Method to get the list of join keys + * + * @return The list of join keys + */ + public List<String> getJoinKeys() { + return this.joinKeys; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java new file mode 100644 index 0000000..7563100 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.operators.stream; + +import org.apache.samza.config.Config; +import org.apache.samza.sql.api.data.EntityName; +import org.apache.samza.sql.api.data.Relation; +import org.apache.samza.sql.api.data.Tuple; +import org.apache.samza.sql.api.operators.RelationOperator; +import org.apache.samza.sql.operators.factory.SimpleOperator; +import org.apache.samza.storage.kv.KeyValueIterator; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.task.sql.SqlMessageCollector; + + +/** + * This class defines an example build-in operator for an istream operator that converts a relation to a stream + * + */ +public class InsertStream extends SimpleOperator implements RelationOperator { + /** + * The <code>InsertStreamSpec</code> for this operator + */ + private final InsertStreamSpec spec; + + /** + * The time-varying relation that is to be converted into a stream + */ + private Relation relation = null; + + /** + * Ctor that takes the specication of the object as input parameter + * + * <p>This version of constructor is often used in an implementation of <code>SqlOperatorFactory</code> + * + * @param spec The <code>InsertStreamSpec</code> specification of this operator + */ + public InsertStream(InsertStreamSpec spec) { + super(spec); + this.spec = spec; + } + + /** + * An alternative ctor that allow users to create an <code>InsertStream</code> object randomly + * + * @param id The identifier of the <code>InsertStream</code> object + * @param input The input relation + * @param output The output stream + */ + public InsertStream(String id, String input, String output) { + super(new InsertStreamSpec(id, EntityName.getRelationName(input), EntityName.getStreamName(output))); + this.spec = (InsertStreamSpec) super.getSpec(); + } + + @Override + public void process(Relation deltaRelation, SqlMessageCollector collector) throws Exception { + KeyValueIterator<Object, Tuple> iterator = deltaRelation.all(); + for (; iterator.hasNext();) { + Tuple tuple = iterator.next().getValue(); + if (!tuple.isDelete()) { + collector.send(tuple); + } + } + } + + @Override + public void init(Config config, TaskContext context) throws Exception { + if (this.relation == null) { + this.relation = (Relation) context.getStore(this.spec.getInputName().toString()); + } + } + + @Override + public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { + // TODO Auto-generated method stub + // assuming this operation does not have pending changes kept in memory + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java new file mode 100644 index 0000000..70475ce --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.operators.stream; + +import org.apache.samza.sql.api.data.EntityName; +import org.apache.samza.sql.api.operators.spec.OperatorSpec; +import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; + + +/** + * Example implementation of specification of <code>InsertStream</code> operator + */ +public class InsertStreamSpec extends SimpleOperatorSpec implements OperatorSpec { + + /** + * Default ctor of <code>InsertStreamSpec</code> + * + * @param id The identifier of the operator + * @param input The input relation entity + * @param output The output stream entity + */ + public InsertStreamSpec(String id, EntityName input, EntityName output) { + super(id, input, output); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java new file mode 100644 index 0000000..935ffc0 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.operators.window; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.samza.config.Config; +import org.apache.samza.sql.api.data.EntityName; +import org.apache.samza.sql.api.data.Relation; +import org.apache.samza.sql.api.data.Tuple; +import org.apache.samza.sql.api.operators.TupleOperator; +import org.apache.samza.sql.operators.factory.SimpleOperator; +import org.apache.samza.storage.kv.KeyValueIterator; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.task.sql.SqlMessageCollector; + + +/** + * This class defines an example build-in operator for a fixed size window operator that converts a stream to a relation + * + */ +public class BoundedTimeWindow extends SimpleOperator implements TupleOperator { + + /** + * The specification of this window operator + */ + private final WindowSpec spec; + + /** + * The relation that the window operator keeps internally + */ + private Relation relation = null; + + /** + * The list of window states of all active windows the window operator keeps in track + */ + private List<WindowState> windowStates = null; + + /** + * Ctor that takes <code>WindowSpec</code> specification as input argument + * + * <p>This version of constructor is often used in an implementation of <code>SqlOperatorFactory</code> + * + * @param spec The window specification object + */ + public BoundedTimeWindow(WindowSpec spec) { + super(spec); + this.spec = spec; + } + + /** + * A simplified version of ctor that allows users to randomly created a window operator w/o spec object + * + * @param wndId The identifier of this window operator + * @param lengthSec The window size in seconds + * @param input The input stream name + * @param output The output relation name + */ + public BoundedTimeWindow(String wndId, int lengthSec, String input, String output) { + super(new WindowSpec(wndId, EntityName.getStreamName(input), EntityName.getRelationName(output), lengthSec)); + this.spec = (WindowSpec) super.getSpec(); + } + + @Override + public void process(Tuple tuple, SqlMessageCollector collector) throws Exception { + // for each tuple, this will evaluate the incoming tuple and update the window states. + // If the window states allow generating output, calculate the delta changes in + // the window relation and execute the relation operation <code>nextOp</code> + updateWindow(tuple); + processWindowChanges(collector); + } + + private void processWindowChanges(SqlMessageCollector collector) throws Exception { + if (windowStateChange()) { + collector.send(getWindowChanges()); + } + } + + private Relation getWindowChanges() { + // TODO Auto-generated method stub + return null; + } + + private boolean windowStateChange() { + // TODO Auto-generated method stub + return getWindowChanges() != null; + } + + private void updateWindow(Tuple tuple) { + // TODO Auto-generated method stub + // The window states are updated here + // And the correpsonding deltaChanges is also calculated here. + } + + private void updateWindowTimeout() { + // TODO Auto-generated method stub + // The window states are updated here + // And the correpsonding deltaChanges is also calculated here. + } + + @Override + public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { + SqlMessageCollector sqlCollector = (SqlMessageCollector) collector; + updateWindowTimeout(); + processWindowChanges(sqlCollector); + sqlCollector.timeout(this.spec.getOutputNames()); + } + + @Override + public void init(Config config, TaskContext context) throws Exception { + // TODO Auto-generated method stub + if (this.relation == null) { + this.relation = (Relation) context.getStore(this.spec.getOutputName().toString()); + Relation wndStates = (Relation) context.getStore(this.spec.getWndStatesName()); + this.windowStates = new ArrayList<WindowState>(); + for (KeyValueIterator<Object, Tuple> iter = wndStates.all(); iter.hasNext();) { + this.windowStates.add((WindowState) iter.next().getValue().getMessage()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java new file mode 100644 index 0000000..e2ae3aa --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.operators.window; + +import org.apache.samza.sql.api.data.EntityName; +import org.apache.samza.sql.api.operators.spec.OperatorSpec; +import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; + + +/** + * This class implements the specification class for the build-in <code>BoundedTimeWindow</code> operator + */ +public class WindowSpec extends SimpleOperatorSpec implements OperatorSpec { + + /** + * The window size in seconds + */ + private final int wndSizeSec; + + /** + * Default ctor of the <code>WindowSpec</code> object + * + * @param id The identifier of the operator + * @param input The input stream entity + * @param output The output relation entity + * @param lengthSec The window size in seconds + */ + public WindowSpec(String id, EntityName input, EntityName output, int lengthSec) { + super(id, input, output); + this.wndSizeSec = lengthSec; + } + + /** + * Method to get the window state relation name + * + * @return The window state relation name + */ + public String getWndStatesName() { + return this.getId() + "-wnd-state"; + } + + /** + * Method to get the window size in seconds + * + * @return The window size in seconds + */ + public int getWndSizeSec() { + return this.wndSizeSec; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowState.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowState.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowState.java new file mode 100644 index 0000000..48547f0 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowState.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.operators.window; + +public class WindowState { + public String startOffset = null; + public String endOffset = null; + public boolean isClosed = false; + + public void open(String offset) { + this.isClosed = false; + this.startOffset = offset; + } + + public void close(String offset) { + this.endOffset = offset; + this.isClosed = true; + } + + public void advanceTo(String offset) { + this.endOffset = offset; + } + + public boolean isClosed() { + return this.isClosed; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/sql/router/SimpleRouter.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/router/SimpleRouter.java b/samza-sql/src/main/java/org/apache/samza/sql/router/SimpleRouter.java new file mode 100644 index 0000000..c6fc673 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/router/SimpleRouter.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.router; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.samza.sql.api.data.EntityName; +import org.apache.samza.sql.api.operators.Operator; +import org.apache.samza.sql.api.operators.RelationOperator; +import org.apache.samza.sql.api.operators.TupleOperator; +import org.apache.samza.sql.api.router.OperatorRouter; + + +/** + * Example implementation of <code>OperatorRouter</code> + * + */ +public class SimpleRouter implements OperatorRouter { + /** + * List of operators added to the <code>OperatorRouter</code> + */ + private List<Operator> operators = new ArrayList<Operator>(); + + @SuppressWarnings("rawtypes") + /** + * Map of <code>EntityName</code> to the list of operators associated with it + */ + private Map<EntityName, List> nextOps = new HashMap<EntityName, List>(); + + /** + * List of <code>EntityName</code> as system inputs + */ + private List<EntityName> inputEntities = new ArrayList<EntityName>(); + + @SuppressWarnings("unchecked") + private void addOperator(EntityName output, Operator nextOp) { + if (nextOps.get(output) == null) { + nextOps.put(output, new ArrayList<Operator>()); + } + nextOps.get(output).add(nextOp); + operators.add(nextOp); + + } + + @Override + public Iterator<Operator> iterator() { + return operators.iterator(); + } + + @Override + public void addTupleOperator(EntityName outputStream, TupleOperator nextOp) throws Exception { + if (!outputStream.isStream()) { + throw new IllegalArgumentException("Can't attach an TupleOperator " + nextOp.getSpec().getId() + + " to a non-stream entity " + outputStream); + } + addOperator(outputStream, nextOp); + } + + @Override + public void addRelationOperator(EntityName outputRelation, RelationOperator nextOp) throws Exception { + if (!outputRelation.isRelation()) { + throw new IllegalArgumentException("Can't attach an RelationOperator " + nextOp.getSpec().getId() + + " to a non-relation entity " + outputRelation); + } + addOperator(outputRelation, nextOp); + } + + @SuppressWarnings("unchecked") + @Override + public List<RelationOperator> getRelationOperators(EntityName outputRelation) { + if (!outputRelation.isRelation()) { + throw new IllegalArgumentException("Can't get RelationOperators for a non-relation output: " + outputRelation); + } + return nextOps.get(outputRelation); + } + + @SuppressWarnings("unchecked") + @Override + public List<TupleOperator> getTupleOperators(EntityName outputStream) { + if (!outputStream.isStream()) { + throw new IllegalArgumentException("Can't get TupleOperators for a non-stream output: " + outputStream); + } + return nextOps.get(outputStream); + } + + @Override + public boolean hasNextOperators(EntityName output) { + return nextOps.get(output) != null && !nextOps.get(output).isEmpty(); + } + + @SuppressWarnings("unchecked") + @Override + public List<Operator> getNextOperators(EntityName output) { + return nextOps.get(output); + } + + @Override + public void addSystemInput(EntityName input) { + if (!nextOps.containsKey(input) || nextOps.get(input).isEmpty()) { + throw new IllegalStateException("Can't set a system input w/o any next operators. input:" + input); + } + if (!inputEntities.contains(input)) { + inputEntities.add(input); + } + } + + @Override + public List<EntityName> getSystemInputs() { + return this.inputEntities; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java b/samza-sql/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java new file mode 100644 index 0000000..1e5310f --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task.sql; + +import java.util.List; + +import org.apache.samza.sql.api.data.EntityName; +import org.apache.samza.sql.api.data.Relation; +import org.apache.samza.sql.api.data.Tuple; +import org.apache.samza.sql.api.operators.Operator; +import org.apache.samza.sql.api.operators.RelationOperator; +import org.apache.samza.sql.api.operators.TupleOperator; +import org.apache.samza.sql.api.router.OperatorRouter; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + + +/** + * Example implementation of a <code>SqlMessageCollector</code> that uses <code>OperatorRouter</code> + * + */ +public class OperatorMessageCollector implements SqlMessageCollector { + + private final MessageCollector collector; + private final TaskCoordinator coordinator; + private final OperatorRouter rteCntx; + + public OperatorMessageCollector(MessageCollector collector, TaskCoordinator coordinator, OperatorRouter rteCntx) { + this.collector = collector; + this.coordinator = coordinator; + this.rteCntx = rteCntx; + } + + @Override + public void send(Relation deltaRelation) throws Exception { + for (RelationOperator op : this.rteCntx.getRelationOperators(deltaRelation.getName())) { + op.process(deltaRelation, this); + } + } + + @Override + public void send(Tuple tuple) throws Exception { + for (TupleOperator op : this.rteCntx.getTupleOperators(tuple.getStreamName())) { + op.process(tuple, this); + } + } + + @Override + public void timeout(List<EntityName> outputs) throws Exception { + for (EntityName output : outputs) { + for (Operator op : this.rteCntx.getNextOperators(output)) { + op.window(this, this.coordinator); + } + } + } + + @Override + public void send(OutgoingMessageEnvelope envelope) { + this.collector.send(envelope); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4861df4/samza-sql/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java b/samza-sql/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java new file mode 100644 index 0000000..b98e2d7 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task.sql; + +import java.util.List; + +import org.apache.samza.sql.api.data.EntityName; +import org.apache.samza.sql.api.data.Relation; +import org.apache.samza.sql.api.data.Tuple; +import org.apache.samza.task.MessageCollector; + + +/** + * This class defines the interface class to be used by the operators to send their output via runtime system resources, + * s.t. the output system streams, the system storage, or <code>OperatorRouter</code>. + * + */ +public interface SqlMessageCollector extends MessageCollector { + + /** + * This method allows the current operator send its relation output to next + * + * @param deltaRelation The delta <code>Relation</code> output generated by the current operator + * @throws Exception Throws exception if failed + */ + void send(Relation deltaRelation) throws Exception; + + /** + * This method allows the current operator send its tuple output to next + * + * @param tuple The <code>Tuple</code> object generated by the current operator + * @throws Exception Throws exception if failed + */ + void send(Tuple tuple) throws Exception; + + /** + * This method allows the current operator triggers timeout actions via the <code>SqlMessageCollector</code>. + * + * <p>This method sets timeout events to the corresponding <code>outputEntities</code> s.t. the next operators + * attached to those entities will be notified of the timeout. + * + * @param outputEntities The list of output entities via which the timeout event needs to be sent to + * @throws Exception Throws exception if failed + */ + void timeout(List<EntityName> outputEntities) throws Exception; + +}
