Sorry about the confusion, Milinda. Thanks for reverting, Yi!
On 6/1/15, 10:53 PM, "Yi Pan" <[email protected]> wrote: >Hi, Milinda, > >That was an accidental mistake. I have reverted the check-in. I am still >working on that. Thanks! > >-Yi > >On Mon, Jun 1, 2015 at 9:34 PM, Milinda Pathirage <[email protected]> >wrote: > >> Hi Navina, >> >> Did we decided to push this patch to samza-sql branch. I thought Yi is >> still working on this. Some Git conflict related texts are still there >>in >> this commit. >> >> +<<<<<<< HEAD >> + * The callback object >> +======= >> + * The callback function >> +>>>>>>> SAMZA-552: use OperatorCallback to allow implementation of >> callbacks w/o inheriting and creating many sub-classes from operators >> >> Milinda >> >> On Mon, Jun 1, 2015 at 9:06 PM, <[email protected]> wrote: >> >> > Yi's TopologyBuilder RB 34500 >> > >> > >> > Project: http://git-wip-us.apache.org/repos/asf/samza/repo >> > Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/45b85477 >> > Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/45b85477 >> > Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/45b85477 >> > >> > Branch: refs/heads/samza-sql >> > Commit: 45b854772cf36cc69e8d8cda7a51bce1be5fe576 >> > Parents: 41c4cd0 >> > Author: Navina <[email protected]> >> > Authored: Thu May 28 18:51:30 2015 -0700 >> > Committer: Navina <[email protected]> >> > Committed: Thu May28 18:51:30 2015 -0700 >> > >> > ---------------------------------------------------------------------- >> > .../apache/samza/sql/api/data/EntityName.jaa | 41 ++- >> > .../org/apache/samza/sql/api/data/Table.java | 7 +- >> > .../samza/sql/api/operators/Operator.java | 4 + >> > .../sql/api/operators/OperatorCallback.java | 1 - >> > .../samza/sql/api/operators/OperatorRouter.java | 8 + >> > .../samza/sql/api/operators/OperatorSink.java | 30 ++ >> > .../samza/sql/api/operators/OperatorSource.java | 30 ++ >> > .../samza/sql/api/operators/SimpleOperator.java | 3 +- >> > .../samza/sql/data/IncomingMessageTuple.java | 1 - >> > .../sql/operators/NoopOperatorCallback.java | 53 ++++ >> > .../samza/sql/operators/OperatorTopology.java | 53 ++++ >> > .../samza/sql/operators/SimpleOperatorImpl.java | 147 ++++++++++ >> > .../samza/sql/operators/SimpleOperatorSpec.java | 106 +++++++ >> > .../samza/sql/operators/SimpleRouter.java | 141 +++++++++ >> > .../operators/factory/NoopOperatorCallback.java | 50 ---- >> > .../operators/factory/SimpleOperatorImpl.java | 136 --------- >> > .../operators/factory/SimpleOperatorSpec.java | 106 ------- >> > .../sql/operators/factory/SimpleRouter.java | 136 --------- >> > .../sql/operators/factory/TopologyBuilder.java | 284 >> +++++++++++++++++++ >> > .../sql/operators/join/StreamStreamJoin.java | 3 +- >> > .../operators/join/StreamStreamJoinSpec.java | 15 +- >> > .../sql/operators/partition/PartitionOp.java | 3 +- >> > .../sql/operators/partition/PartitionSpec.java | 2 +- >> > .../sql/operators/window/BoundedTimeWindow.java | 4 +- >> > .../samza/sql/operators/window/WindowSpec.java | 7 +- >> > .../samza/task/sql/SimpleMessageCollector.java | 37 ++- >> > .../task/sql/RandomWindowOperatorTask.java | 11 +- >> > .../apache/samza/task/sql/StreamSqlTask.java | 26 +- >> > .../samza/task/sql/UserCallbacksSqlTask.java | 66 ++--- >> > 29 files changed, 991 insertions(+), 520 deletions(-) >> > ---------------------------------------------------------------------- >> > >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/api/data/EntityName.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.j >>ava >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.j >>ava >> > index 80ba455..df1b11b 100644 >> > --- >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.j >>ava >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.j >>ava >> > @@ -49,6 +49,8 @@ public class EntityName { >> > */ >> > private final String name; >> > >> > + private final boolean isSystemEntity; >> > + >> > /** >> > * Static map of already allocated table names >> > */ >> > @@ -59,15 +61,19 @@ public class EntityName { >> > */ >> > private static Map<String, EntityName> streams = new >>HashMap<String, >> > EntityName>(); >> > >> > + private static final String ANONYMOUS = "anonymous"; >> > + >> > /** >> > * Private ctor to create entity names >> > * >> > * @param type Type of the entity name >> > * @param name Formatted name of the entity >> > + * @param isSystemEntity whether the entity is a system >>input/output >> > */ >> > - private EntityName(EntityType type, String name) { >> > + private EntityName(EntityType type, String name, boolean >> > isSystemEntity) { >> > this.type = type; >> > this.name = name; >> > + this.isSystemEntity = isSystemEntity; >> > } >> > >> > @Override >> > @@ -10,6 +108,10 @@ public class EntityName { >> > return this.type.equals(EntityType.STREAM); >> > } >> > >> > + public boolean isSysteEntity() { >> > + return this.isSystemEntity; >> > + } >> > + >> > /** >> > * Get the formatted entity name >> > * >> > @@ -111,15 +121,24 @@ public class EntityName { >> > return this.name; >> > } >> > >> > + public static EntityName getTableName(String name) { >> > + return getTableName(name, false); >> > + } >> > + >> > + public static EntityName getStreamName(Sting name) { >> > + return getStreamName(name, false); >> > + } >> > + >> > /** >> > * Static method to get the instance of {@code EntityName} with >>type >> > {@code EntityType.TABLE} >> > * >> > * @param name The formatted entity name of the relation >> > + * @param isSystem The boolean flag indicating whether this is a >> system >> > input/output >> > * @return A <code>EntityName</code> for a relation >> > */ >> > - public static EntityName getTableName(String name) { >> > + public static EntityName getTableName(String name, boolean >>isSystem) { >> > if (tables.get(name) == null) { >> > - tables.put(name, new EntityName(EntityType.TABLE, name)); >> > + tables.put(name, new EntityName(EntityType.TABLE, name, >> isSystem)); >> > } >> > return tables.get(name); >> > } >> > @@ -128,13 +147,25 @@ public class EntityName { >> > * Static method to get the instance of <code>EntityName</code> >>with >> > type <code>EntityType.STREAM</code> >> > * >> > * @param name The formatted ntity name of the stream >> > + * @param isSystem The boolean flag indicating whethr this is a >> system >> > input/output >> > * @return A <code>EntityName</code> for a stream >> > */ >> > - public static EntityName getStreamName(String name) { >> > + public static EntityName getStreamName(String name, boolean >>isSystem) >> { >> > if (streams.get(name) == null) { >> > - streams.put(name, new EntityName(EntityType.STREAM, name)); >> > + streams.put(name, new EntityName(EntityType.STREAM, name, >>> isSystem)); >> > } >> > return streams.get(name); >> > } >> > >> > + public static EntityName getAnonymousStream() { >> > + return getStreamName(ANONYMOUS); >> > + } >> > + >> > + public static EntityName getAnonymousTable() { >> > + return getTableName(ANONYMOUS); >> > + } >> > + >> > + public boolean isAnonymous() { >> > + return this.name.equals(ANONYMOUS); >> > + } >> > } >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/api/data/Table.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java >> > index 7b4d984..b4dce07 100644 >> > --- >> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java >> > +++ >> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java >> > @@ -19,6 +19,9 @@ >> > >> > packag org.apache.samza.sql.api.data; >> > >> > +import java.util.List; >> > + >> > + >> > /** >> > * This interface defines a non-ordered {@link >> > org.apache.samza.sql.api.data.Relation}, which has a unique primary >>key >> > * >> > @@ -31,8 +34,8 @@ public interface Table<K> extends Relation<K> { >> > /** >> > * Get the primary key field name for this table >> > * >> > - * @return The name of the primary key fild >> > + * @return The names of the primary key fields >> > */ >> > - String getPrimaryKeyName(); >> > + List<String> getPrimaryKeyNames(); >> > >> > } >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/api/operators/Operator.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato >>r.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato >>r.java >> > index d6f6b57.9c6eaa5 100644 >> > --- >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato >>r.java >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato >>r.java >> > @@ -27,7 +27,11 @@ import org.apache.samza.task.TaskContext; >> > import org.apache.samza.task.TasCoordinator; >> > >> > >> > +/** >> > + * This class defines the common interface for operator classes. >> > + */ >> > public interface Operator { >> > + >> > /** >> > * Method to initialize the operator >> > * >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato >>rCallback.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato >>rCallback.java >> > index fb2aa89..5a77d95 100644 >> > --- >> > >> >>a/samza-sql-core/sr/main/java/org/apache/samza/sql/api/operators/Operato >>rCallback.java >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato >>rCallback.java >> > @@ -23,7 +23,6 @@ import org.apache.saza.sql.api.data.Tuple; >> > import org.apache.samza.task.MessageCollector; >> > import org.apache.samza.task.TaskCoordinator; >> > >> > - >> > /** >> > * Defines the callback functions to allow customized functions to be >> >invoked before process and before sending the result >> > */ >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java >> > --------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato >>rRuter.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato >>rRouter.java >> > index 0759638..432e6b3 100644 >> > --- >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato >>rRouter.java >> > +++ >> > >> >>b/sama-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato >>rRouter.java >> > @@ -19,6 +19,7 @@ >> > >> > package org.apache.samza.sql.api.operators; >> > >> > +import java.util.Iterator; >> > mport java.util.List; >> > >> > import org.apache.samza.sql.api.data.EntityName; >> > @@ -51,4 +52,11 @ public interface OperatorRouter extends Operator { >> > */ >> > List<SimpleOperator> getNextOperators(EntityName output); >> > >> > + /** >> > + * This method provides an iterator to go through all operators >> > connected via {@code OperatorRouter} >> > + * >> > + * @return An {@link java.util.Iterator} for all operators >>connected >> > via {@code OperatorRouter} >> > + */ >> > + Iterator<SimpleOperator> iterator(); >> > + >> > } >> > >> > >> > >> >>http://git-wip-us.apache.og/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato >>rSink.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato >>rSink.java >> > new file mode 100644 >> > index 0000000..e2c748c >> > --- /dev/null >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato >>rSink.java >> > @@ -0,0 +1,30 @@ >> > +/* >> > + * Licensed to the Apache Software Foundation (ASF) under one >> > + * or more contributor license agreements. See the NOTICE file >> > + * distributed with this work for additional information >> > + * regarding copyright ownership. The ASF licenses this file >> > + * to you under the Apache License, Version 2.0 (the >> > + * "License"); you may not use this file except in compliance >> > + * with the License. You may obtain a copy of the License at >> > + * >> > + * http://www.apache.org/licenses/LICENSE-2.0 >> > + * >> > + * Unless required by applicable law or agreed to in writing, >> > + * software distributed under the License is distributed on an >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >> > + * KIND, either express or implied. See the License for the >> > + * specific language governing permissions and limitations >> > + * under the License. >> > + */ >> > +package org.apache.samza.sql.api.operators; >> > + >> > +import java.util.Iterator; >> > + >> > +import org.apache.samza.sql.api.data.EntityName; >> > + >> > + >> > +public interface OperatorSink { >> > + Iterator<SimpleOperator> opIterator(); >> > + >> > + EntityName getName(); >> > +} >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato >>rSource.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato >>rSource.java >> > new file mode 100644 >> > index 0000000..860c1aa >> > --- /dev/null >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato >>rSource.java >> > @@ -0,0 +1,30 @@ >> > +/* >> > + * Licensed to the Apache Software Foundation (ASF) under one >> > + * or more contributor license agreements. See the NOTICE file >> > + * distributed with this work for additional information >> > + * regarding copyright ownership. The ASF licenses this file >> > + * to you under the Apache License, Version 2.0 (the >> > + * "License"); you may not use this file except in compliance >> > + * with the License. You may obtain a copy of the License at >> > + * >> > + * http://www.apache.org/licenses/LICENSE-2.0 >> > + * >> > + * Unless required by applicable law or agreed to in writing, >> > + * software distributed under the License is distributed on an >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >> > + * KIND, either express or implied. See the License for the >> > + * specific language governing permissions and limitations >> > + * under the License. >> > + */ >> > +package org.apache.samza.sql.api.operators; >> > + >> > +import java.util.Iterator; >> > + >> > +import org.apache.samza.sql.api.data.EntityName; >> > + >> > + >> > +public interface OperatorSource { >> > + Iterator<SimpleOperator> opIterator(); >> > + >> > + EntityName getName(); >> > +} >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleO >>perator.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleO >>perator.java >> > index c49a822..60ace9c 100644 >> > --- >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleO >>perator.java >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleO >>perator.java >> > @@ -19,8 +19,6 @@ >> > >> > package org.apache.samza.sql.api.operators; >> > >> > - >> > - >> > /** >> > * The interface for a {@code SimpleOperator} that implements a >>simple >> > primitive relational logic operation >> > */ >> > @@ -31,4 +29,5 @@ public interface SimpleOperator extends Operator { >> > * @return The {@link >>org.apache.samza.sql.api.operators.OperatorSpec} >> > object that defines the configuration/parameters of the operator >> > */ >> > OperatorSpec getSpec(); >> > + >> > } >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageT >>uple.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageT >>uple.java >> > index 72a59f2..af040f0 100644 >> > --- >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageT >>uple.java >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageT >>uple.java >> > @@ -81,7 +81,6 @@ public class IncomingMessageTuple implements Tuple { >> > >> > @Override >> > public long getCreateTimeNano() { >> > - // TODO: this is wrong and just to keep as an placeholder. It >>should >> > be replaced by the message publish time when the publish timestamp is >> > available in the message metadata >> > return this.recvTimeNano; >> > } >> > >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperato >>rCallback.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperato >>rCallback.java >> > new file mode 100644 >> > index 0000000..e951737 >> > --- /dev/null >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperato >>rCallback.java >> > @@ -0,0 +1,53 @@ >> > +/* >> > + * Licensed to the Apache Software Foundation (ASF) under one >> > + * or more contributor license agreements. See the NOTICE file >> > + * distributed with this work for additional information >> > + * regarding copyright ownership. The ASF licenses this file >> > + * to you under the Apache License, Version 2.0 (the >> > + * "License"); you may not use this file except in compliance >> > + * with the License. You may obtain a copy of the License at >> > + * >> > + * http://www.apache.org/licenses/LICENSE-2.0 >> > + * >> > + * Unless required by applicable law or agreed to in writing, >> > + * software distributed under the License is distributed on an >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >> > + * KIND, either express or implied. See the License for the >> > + * specific language governing permissions and limitations >> > + * under the License. >> > + */ >> > +package org.apache.samza.sql.operators; >> > + >> > +import org.apache.samza.sql.api.data.Relation; >> > +import org.apache.samza.sql.api.data.Tuple; >> > +import org.apache.samza.sql.api.operators.OperatorCallback; >> > +import org.apache.samza.task.MessageCollector; >> > +import org.apache.samza.task.TaskCoordinator; >> > + >> > + >> > +/** >> > + * This is a default NOOP operator callback object that does nothing >> > before and after the process method >> > + */ >> > +public final class NoopOperatorCallback implements OperatorCallback { >> > + >> > + @Override >> > + public Tuple beforeProcess(Tuple tuple, MessageCollector collector, >> > TaskCoordinator coordinator) { >> > + return tuple; >> > + } >> > + >> > + @Override >> > + public Relation beforeProcess(Relation rel, MessageCollector >> collector, >> > TaskCoordinator coordinator) { >> > + return rel; >> > + } >> > + >> > + @Override >> > + public Tuple afterProcess(Tuple tuple, MessageCollector collector, >> > TaskCoordinator coordinator) { >> > + return tuple; >> > + } >> > + >> > + @Override >> > + public Relation afterProcess(Relation rel, MessageCollector >>collector, >> > TaskCoordinator coordinator) { >> > + return rel; >> > + } >> > + >> > +} >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTop >>ology.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTop >>ology.java >> > new file mode 100644 >> > index 0000000..8b70092 >> > --- /dev/null >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTop >>ology.java >> > @@ -0,0 +1,53 @@ >> > +/* >> > + * Licensed to the Apache Software Foundation (ASF) under one >> > + * or more contributor license agreements. See the NOTICE file >> > + * distributed with this work for additional information >> > + * regarding copyright ownership. The ASF licenses this file >> > + * to you under the Apache License, Version 2.0 (the >> > + * "License"); you may not use this file except in compliance >> > + * with the License. You may obtain a copy of the License at >> > + * >> > + * http://www.apache.org/licenses/LICENSE-2.0 >> > + * >> > + * Unless required by applicable law or agreed to in writing, >> > + * software distributed under the License is distributed on an >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >> > + * KIND, either express or implied. See the License for the >> > + * specific language governing permissions and limitations >> > + * under the License. >> > + */ >> > +package org.apache.samza.sql.operators; >> > + >> > +import java.util.Iterator; >> > + >> > +import org.apache.samza.sql.api.data.EntityName; >> > +import org.apache.samza.sql.api.operators.OperatorSink; >> > +import org.apache.samza.sql.api.operators.OperatorSource; >> > +import org.apache.samza.sql.api.operators.SimpleOperator; >> > + >> > + >> > +/** >> > + * This class implements a partially completed {@link >> > org.apache.samza.sql.operators.factory.TopologyBuilder} that >>signifies a >> > partially completed >> > + * topology that the current operator has unbounded input stream that >> can >> > be attached to other operators' output >> > + */ >> > +public class OperatorTopology implements OperatorSource, >>OperatorSink { >> > + >> > + private final EntityName name; >> > + private final SimpleRouter router; >> > + >> > + public OperatorTopology(EntityName name, SimpleRouter router) { >> > + this.name = name; >> > + this.router = router; >> > + } >> > + >> > + @Override >> > + public Iterator<SimpleOperator> opIterator() { >> > + return this.router.iterator(); >> > + } >> > + >> > + @Override >> > + public ntityName getName() { >> > + return this.name; >> > + } >> > + >> > +} >> > >> > >> > >> >http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera >>torImpl.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera >>torImpl.java >> > new file mode 10644 >> > index 0000000..423880b >> > --- /dev/null >> > +++ >> > >> >>b/samza-sql-core/src/main/java/rg/apache/samza/sql/operators/SimpleOpera >>torImpl.java >> > @@ -0,0 +1,147 @@ >> > +/* >> > + * Licensed to the Apache Software Foundation (ASF) under one >> > + * or more contributor license agreements. See the NOTICE file >> > + * distributed with this work for additional information >> > + * regarding copyright ownership. The ASF licenses this file >> > + * to you under the Apache License, Version 2.0 (the >> > + * "License"); you may not use this file except in compliance >> > + * with the License. You may obtain a copy of the License at >> > + * >> > + * http://www.apache.org/licenses/LICENSE-2.0 >> > + * >> > + * Unless required by applicable law or agreed to in writing, >> > + * software distributed under the License is distributed on an >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >> > + * KIND, either express or implied. See the License for the >> > + * specific language governing permissions and limitations >> > + * under the License. >> > + */ >> > + >> > +package org.apache.samza.sql.operators; >> > + >> > +import org.apache.samza.sql.api.data.Relation; >> > +import org.apache.samza.sql.api.data.Tuple; >> > +import org.apache.samza.sql.api.operators.OperatorCallback; >> > +import org.aache.samza.sql.api.operators.OperatorSpec; >> > +import org.apache.samza.sql.api.operators.SimpleOperator; >> > +import org.apache.samza.task.MessageCollector; >> > +import org.apache.samza.task.TaskCoordinator; >> > +import org.apache.samza.task.sql.SimpleMessageCollector; >> > + >> > + >> > +/** >> > + * An abstract class that encapsulate the basic information and >>methods >> > that all operator classes should implement. >> > + * It implements the interface {@link >> > org.apache.samza.sql.api.operators.SimpleOperator} >> > + */ >> > +public abstract class SimpleOperatorImpl implements SimpleOperator { >> > + /** >> > + * The specification of this operator >> > + */ >> > + private final OperatorSpec spec; >> > + >> > + /** >> > +<<<<<<<HEAD >> > + * The callback object >> > +======= >> > + * The callback function >> > +>>>>>>> SAMZA-552: use OperatorCallback to allow implementation of >> > callbacks w/o inheriting and creating many sub-classes from operators >> > + */ >> > + private final OperatorCallback callback; >> > + >> > + /** >> > + * Ctor of {@code SimpleOperatorImpl} class >> > + * >> > + * @param spec The specification of this operator >> > + */ >> > + public SimpleOperatorImpl(OperatorSpec spec) { >> > + this(spec, new NoopOperatorCallback()); >> > + } >> > + >> > + public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback >> callback) >> > { >> > + this.spec = spec; >> > + this.callback = callback; >> > + } >> > + >> > + @Override >> > + public OperatorSpec getSpec() { >> > + return this.spec; >> > + } >> > + >> > + /** >> > + * This method is made final s.t. the sequence of invocations >>between >> > {@link >> > >> >>org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relatio >>n, >> > MessageCollector, TaskCoordinator)} >> > + * and real processing of the input relation is fixed. >> > + */ >> > + @Override >> > + final public void process(Relation deltaRelation, MessageCollector >> > collector, TaskCoordinator coordinator) >> > + throws Exception { >> > + Relation rel = this.callback.beforeProcess(deltaRelation, >>collector, >> > coordinator); >> > + if (rel == null) { >> > + return; >> > + } >> > + this.realProcess(rel, getCollector(collector, coordinator), >> > coordinator); >> > + } >> > + >> > + /** >> > + * This method is made final s.t. the sequence of invocations >>between >> > {link >> > >>org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple, >> > MessageCollector, TaskCoordinator)} >> > + * and real processing of the input tuple is fixed. >> > + */ >>> + @Override >> > + final public void process(Tuple tuple, MessageCollector collector, >> > TaskCoordinator coordinator) throws Exception { >> > + Tuple itupl = this.callback.beforeProcess(tuple, collector, >> > coordinator); >> > + if (ituple == null) { >> > + return; >> > + } >> > + this.realProcess(ituple, getCollector(collector, coordinator), >> > coordinator); >> > + } >> > + >> > + /** >> > + * This method is made final s.t. we enforce the invocation of >>{@code >> > SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)} >> bfore >> > doing anything futher >> > + */ >> > + @Override >> > + final public void refresh(long timeNano, MessageCollector >>collector, >> > TaskCoordinator coordinator) throws Exception { >> > + this.realRefresh(timeNano, getCollector(collecto, coordinator), >> > coordinator); >> > + } >> > + >> > + private SimpleMessageCollector getCollector(MessageCollector >> collector, >> > TaskCoordinator coordinator) { >> > + if (!(collector instanceof SimpleMessageCollector)) { >> > + return new SimpleMessageCollector(collector, coordnator, >> > this.callback); >> > + } else { >> > + ((SimpleMessageCollector) >> collector).switchCallback(this.callback); >> > + return (SimpleMessageCollector) collector; >> > + } >> > + } >> > + >> > + /** >> > + * Method to be overriden by each specific implementation class of >> > operator to handle timeout event >> > + * >> > + * @param timeNano The time in nanosecond when the timeout event >> > occurred >> > + * @param collector The {@link >> > org.apache.samza.task.sql.SimpleMessageCollector} in the context >> > + * @param coordinator The {@link >> org.apache.samza.task.TaskCoordinator} >> > in the context >> > + * @throws Exception Throws exception if failed to refresh the >>results >> > + */ >> > + protected abstract void realRefresh(long timeNano, >> > SimpleMessageCollector collector, TaskCoordinator coordinator) >> > + throws Exception; >> > + >> > + /** >> > + * Method to be overriden by each specific implementation class of >> > operator to perform relational logic operation on an input {@link >> > org.apache.samza.sql.api.data.Relation} >> > + * >> > + * @param rel The input relation >> > + * @param collector The {@link >> > org.apache.samza.task.sql.SimpleMessageCollector} in the context >> > + * @param coordinator The {@link >> org.apache.samza.task.TaskCoordinator} >> > in the context >> > + * @throws Exception Throws exception if failed to process >> > + */ >> > + protected abstract void realProcess(Relation rel, >> > SimpleMessageCollector collector, TaskCoordinator coordinator) >> > + throws Exceptio; >> > + >> > + /** >> > + * Method to be overriden by each specific implementation class of >> > operator to perform relational logic operation on an input {@ink >> > org.apache.samza.sql.api.data.Tuple} >> > + * >> > + * @param ituple The input tuple > > + * @param collector The {@link >> > org.apache.samza.task.sql.SimpleMessageCollector} in the context >> > + * @param coordinator The {@link >> org.apache.samza.task.TaskCoordinator} >> > in the context >> > + * @throws Exception Throws exception if failed to process >> > + */ >> > + protected abstract void realProcess(Tuple ituple, >> > SimpleMessageCollector collector, TaskCoordinator coordinator) >> > + throws Exception; >> > + >> > +} >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera >>torSpec.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera >>torSpec.java >> > new file mode 100644 >> > index 0000000..691e543 >> > --- /dev/null >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera >>torSpec.java >> > @@ -0,0 +1,106 @@ >> > +/* >> > + * Licensed to the Apache Software Foundation (ASF) under one >> > + * or more contributor license agreements. See the NOTICE file >> > + * distributed with this work for additional information >> > + * regarding copyright ownership. The ASF licenses this file >> > + * to you under the Apache License, Version 2.0 (the >> > + * "License"); you may not use this file except in compliance >> > + * with the License. You may obtain a copy of the License at >> > + * >> > + * http://www.apache.org/licenses/LICENSE-2.0 >> > + * >> > + * Unless required by applicable law or agreed to in writing, >> > + * software distributed under the License is distributed on an >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >> > + * KIND, either express or implied. See the License for the >> > + * specific language governing permissions and limitations >> > + * under the License. >> > + */ >> > +package org.apache.samza.sql.operators; >> > + >> > +import java.util.ArrayList; >> > +import java.util.List; >> > + >> > +import org.apache.samza.sql.api.data.EntityName; >> > +import org.apache.samza.sql.api.operators.OperatorSpec; >> > + >> > + >> > +/** >> > + * An abstract class that encapsulate the basic information and >>methods >> > that all specification of operators should implement. >> > + * It implements {@link >>org.apache.samza.sql.api.operators.OperatorSpec} >> > + */ >> > +public abstract class SimpleOperatorSpec implements OperatorSpec { >> > + /** >> > + * The identifier of the corresponding operator >> > + */ >> > + private final String id; >> > + >> > + /** >> > + * The list of input entity names of the corresponding operator >> > + */ >> > + private final List<EntityName> inputs = new >>ArrayList<EntityName>(); >> > + >> > + /** >> > + * The list of output entity names of the corresponding operator >> > + */ >> > + private final List<EntityName> outputs = new >>ArrayList<EntityName>(); >> > + >> > + /** >> > + * Ctor of the {@code SimpleOperatorSpec} for simple {@link >> > org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and >>one >> > output >> > + * >> > + * @param id Unique identifier of the {@link >> > org.apache.samza.sql.api.operators.SimpleOperator} object >> > + * @param input The only input entity >> > + * @param output The only output entity >> > + */ >> > + public SimpleOperatorSpec(String id, EntityName input, EntityName >> > output) { >> > + this.id = id; >> > + this.inputs.add(input); >> > + this.outputs.add(output); >> > + } >> > + >> > + /** >> > + * Ctor of {@code SimpleOperatorSpec} with general format: m inputs >> and >> > n outputs >> > + * >> > + * @param id Unique identifier of the {@link >> > org.apache.samza.sql.api.operators.SimpleOperator} object >> > + * @param inputs The list of input entities >> > + * @param output The list of output entities >> > + */ >> > + public SimpleOperatorSpec(String id, List<EntityName> inputs, >> > EntityName output) { >> > + this.id = id; >> > + this.inputs.addAll(inputs); >> > + this.outputs.add(output); >> > + } >> > + >> > + @Override >> > + public String getId() { >> > + return this.id; >> > + } >> > + >> > + @Override >> > + public List<EntityName> getInputNames() { >> > + return this.inputs; >> > + } >> > + >> > + @Override >> > + public List<EntityName> getOutputNames() { >> > + return this.outputs; >> > + } >> > + >> > + /** >> > + * Method to get the first output entity >> > + * >> > + * @return The first output entity name >> > + */ >> > + public EntityName getOutputName() { >> > + return this.outputs.get(0); >> > + } >> > + >> > + /** >> > + * Method to get the first input entity >> > + * >> > + * @return The first input entity name >> > + */ >> > + public EntityName getInputName() { >> > + return this.inputs.get(0); >> > + } >> > +} >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRoute >>r.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRoute >>r.java >> > new file mode 100644 >> > index 0000000..2d9a1db >> > --- /dev/null >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRoute >>r.java >> > @@ -0,0 +1,141 @@ >> > +/* >> > + * Licensed to the Apache Software Foundation (ASF) under one >> > + * or more contributor license agreements. See the NOTICE file >> > + * distributed with this work for additional information >> > + * regarding copyright ownership. The ASF licenses this file >> > + * to you under the Apache License, Version 2.0 (the >> > + * "License"); you may not use this file except in compliance >> > + * with the License. You may obtain a copy of the License at >> > + * >> > + * http://www.apache.org/licenses/LICENSE-2.0 >> > + * >> > + * Unless required by applicable law or agreed to in writing, >> > + * software distributed under the License is distributed on an >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >> > + * KIND, either express or implied. See the License for the >> > + * specific language governing permissions and limitations >> > + * under the License. >> > + */ >> > + >> > +package org.apache.samza.sql.operators; >> > + >> > +import java.util.ArrayList; >> > +import java.util.HashMap; >> > +import java.util.HashSet; >> > +import java.util.Iterator; >> > +import java.util.List; >> > +import java.util.Map; >> > +import java.util.Set; >> > + >> > +import org.apache.samza.config.Config; >> > +import org.apache.samza.sql.api.data.EntityName; >> > +import org.apache.samza.sql.api.data.Relation; >> > +import org.apache.samza.sql.api.data.Tuple; >> > +import org.apache.samza.sql.api.operators.Operator; >> > +import org.apache.samza.sql.api.operators.OperatorRouter; >> > +import org.apache.samza.sql.api.operators.SimpleOperator; >> > +import org.apache.samza.task.MessageCollector; >> > +import org.apache.samza.task.TaskContext; >> > +import org.apache.samza.task.TaskCoordinator; >> > +import org.apache.samza.task.sql.RouterMessageCollector; >> > + >> > + >> > +/** >> > + * Example implementation of {@link >> > org.apache.samza.sql.api.operators.OperatorRouter} >> > + * >> > + */ >> > +public final class SimpleRouter implements OperatorRouter { >> > + /** >> > + * List of operators added to the {@link >> > org.apache.samza.sql.api.operators.OperatorRouter} >> > + */ >> > + private List<SimpleOperator> operators = new >> > ArrayList<SimpleOperator>(); >> > + >> > + @SuppressWarnings("rawtypes") >> > + /** >> > + * Map of {@link org.apache.samza.sql.api.data.EntityName} to the >>list >> > of operators associated with it >> > + */ >> > + private Map<EntityName, List> nextOps = new HashMap<EntityName, >> List>(); >> > + >> > + /** >> > + * Set of {@link org.apache.samza.sql.api.data.EntityName} as >>inputs >> to >> > this {@code SimpleRouter} >> > + */ >> > + private Set<EntityName> inputEntities = new HashSet<EntityName>(); >> > + >> > + /** >> > + * Set of entities that are not input entities to this {@code >> > SimpleRouter} >> > + */ >> > + private Set<EntityName> outputEntities = new HashSet<EntityName>(); >> > + >> > + @SuppressWarnings("unchecked") >> > + private void addOperator(EntityName input, SimpleOperator nextOp) { >> > + if (nextOps.get(input) == null) { >> > + nextOps.put(input, new ArrayList<Operator>()); >> > + } >> > + nextOps.get(input).add(nextOp); >> > + operators.add(nextOp); >> > + // get the operator spec >> > + for (EntityName output : nextOp.getSpec().getOutputNames()) { >> > + if (inputEntities.contains(output)) { >> > + inputEntities.remove(output); >> > + } >> > + outputEntities.add(output); >> > + } >> > + if (!outputEntities.contains(input)) { >> > + inputEntities.add(input); >> > + } >> > + } >> > + >> > + @Override >> > + @SuppressWarnings("unchecked") >> > + public List<SimpleOperator> getNextOperators(EntityName entity) { >> > + return nextOps.get(entity); >> > + } >> > + >> > + @Override >> > + public void addOperator(SimpleOperator nextOp) { >> > + List<EntityName> inputs = nextOp.getSpec().getInputNames(); >> > + for (EntityName input : inputs) { >> > + addOperator(input, nextOp); >> > + } >> > + } >> > + >> > + @Override >> > + public void init(Config config, TaskContext context) throws >>Exception >> { >> > + for (SimpleOperator op : this.operators) { >> > + op.init(config, context); >> > + } >> > + } >> > + >> > + @Override >> > + public void process(Tuple ituple, MessageCollector collector, >> > TaskCoordinator coordinator) throws Exception { >> > + MessageCollector opCollector = new >>RouterMessageCollector(collector, >> > coordinator, this); >> > + for (Iterator<SimpleOperator> iter = >> > this.getNextOperators(ituple.getEntityName()).iterator(); >> iter.hasNext();) { >> > + iter.next().process(ituple, opCollector, coordinator); >> > + } >> > + } >> > + >> > + @SuppressWarnings("rawtypes") >> > + @Override >> > + public void process(Relation deltaRelation, MessageCollector >> collector, >> > TaskCoordinator coordinator) throws Exception { >> > + MessageCollector opCollector = new >>RouterMessageCollector(collector, >> > coordinator, this); >> > + for (Iterator<SimpleOperator> iter = >> > this.getNextOperators(deltaRelation.getName()).iterator(); >> iter.hasNext();) >> > { >> > + iter.next().process(deltaRelation, opCollector, coordinator); >> > + } >> > + } >> > + >> > + @Override >> > + public void refresh(long nanoSec, MessageCollector collector, >> > TaskCoordinator coordinator) throws Exception { >> > + MessageCollector opCollector = new >>RouterMessageCollector(collector, >> > coordinator, this); >> > + for (EntityName entity : inputEntities) { >> > + for (Iterator<SimpleOperator> iter = >> > this.getNextOperators(entity).iterator(); iter.hasNext();) { >> > + iter.next().refresh(nanoSec, opCollector, coordinator); >> > + } >> > + } >> > + } >> > + >> > + @Override >> > + public Iterator<SimpleOperator> iterator() { >> > + return this.operators.iterator(); >> > + } >> > + >> > +} >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallbac >>k.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Noo >>pOperatorCallback.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Noo >>pOperatorCallback.java >> > deleted file mode 100644 >> > index c3d2266..0000000 >> > --- >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Noo >>pOperatorCallback.java >> > +++ /dev/null >> > @@ -1,50 +0,0 @@ >> > -/* >> > - * Licensed to the Apache Software Foundation (ASF) under one >> > - * or more contributor license agreements. See the NOTICE file >> > - * distributed with this work for additional information >> > - * regarding copyright ownership. The ASF licenses this file >> > - * to you under the Apache License, Version 2.0 (the >> > - * "License"); you may not use this file except in compliance >> > - * with the License. You may obtain a copy of the License at >> > - * >> > - * http://www.apache.org/licenses/LICENSE-2.0 >> > - * >> > - * Unless required by applicable law or agreed to in writing, >> > - * software distributed under the License is distributed on an >> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >> > - * KIND, either express or implied. See the License for the >> > - * specific language governing permissions and limitations >> > - * under the License. >> > - */ >> > -package org.apache.samza.sql.operators.factory; >> > - >> > -import org.apache.samza.sql.api.data.Relation; >> > -import org.apache.samza.sql.api.data.Tuple; >> > -import org.apache.samza.sql.api.operators.OperatorCallback; >> > -import org.apache.samza.task.MessageCollector; >> > -import org.apache.samza.task.TaskCoordinator; >> > - >> > - >> > -public final class NoopOperatorCallback implements OperatorCallback { >> > - >> > - @Override >> > - public Tuple beforeProcess(Tuple tuple, MessageCollector collector, >> > TaskCoordinator coordinator) { >> > - return tuple; >> > - } >> > - >> > - @Override >> > - public Relation beforeProcess(Relation rel, MessageCollector >> collector, >> > TaskCoordinator coordinator) { >> > - return rel; >> > - } >> > - >> > - @Override >> > - public Tuple afterProcess(Tuple tuple, MessageCollector collector, >> > TaskCoordinator coordinator) { >> > - return tuple; >> > - } >> > - >> > - @Override >> > - public Relation afterProcess(Relation rel, MessageCollector >>collector, >> > TaskCoordinator coordinator) { >> > - return rel; >> > - } >> > - >> > -} >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl. >>java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim >>pleOperatorImpl.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim >>pleOperatorImpl.java >> > deleted file mode 100644 >> > index e66451f..0000000 >> > --- >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim >>pleOperatorImpl.java >> > +++ /dev/null >> > @@ -1,136 +0,0 @@ >> > -/* >> > - * Licensed to the Apache Software Foundation (ASF) under one >> > - * or more contributor license agreements. See the NOTICE file >> > - * distributed with this work for additional information >> > - * regarding copyright ownership. The ASF licenses this file >> > - * to you under the Apache License, Version 2.0 (the >> > - * "License"); you may not use this file except in compliance >> > - * with the License. You may obtain a copy of the License at >> > - * >> > - * http://www.apache.org/licenses/LICENSE-2.0 >> > - * >> > - * Unless required by applicable law or agreed to in writing, >> > - * software distributed under the License is distributed on an >> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >> > - * KIND, either express or implied. See the License for the >> > - * specific language governing permissions and limitations >> > - * under the License. >> > - */ >> > - >> > -package org.apache.samza.sql.operators.factory; >> > - >> > -import org.apache.samza.sql.api.data.Relation; >> > -import org.apache.samza.sql.api.data.Tuple; >> > -import org.apache.samza.sql.api.operators.OperatorCallback; >> > -import org.apache.samza.sql.api.operators.OperatorSpec; >> > -import org.apache.samza.sql.api.operators.SimpleOperator; >> > -import org.apache.samza.task.MessageCollector; >> > -import org.apache.samza.task.TaskCoordinator; >> > -import org.apache.samza.task.sql.SimpleMessageCollector; >> > - >> > - >> > -/** >> > - * An abstract class that encapsulate the basic information and >>methods >> > that all operator classes should implement. >> > - * It implements the interface {@link >> > org.apache.samza.sql.api.operators.SimpleOperator} >> > - * >> > - */ >> > -public abstract class SimpleOperatorImpl implements SimpleOperator { >> > - /** >> > - * The specification of this operator >> > - */ >> > - private final OperatorSpec spec; >> > - >> > - /** >> > - * The callback function >> > - */ >> > - private final OperatorCallback callback; >> > - >> > - /** >> > - * Ctor of {@code SimpleOperatorImpl} class >> > - * >> > - * @param spec The specification of this operator >> > - */ >> > - public SimpleOperatorImpl(OperatorSpec spec) { >> > - this(spec, new NoopOperatorCallback()); >> > - } >> > - >> > - public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback >> callback) >> > { >> > - this.spec = spec; >> > - this.callback = callback; >> > - } >> > - >> > - @Override >> > - public OperatorSpec getSpec() { >> > - return this.spec; >> > - } >> > - >> > - /** >> > - * This method is made final s.t. the sequence of invocations >>between >> > {@link >> > >> >>org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relatio >>n, >> > MessageCollector, TaskCoordinator)} >> > - * and real processing of the input relation is fixed. >> > - */ >> > - @Override >> > - final public void process(Relation deltaRelation, MessageCollector >> > collector, TaskCoordinator coordinator) >> > - throws Exception { >> > - Relation rel = this.callback.beforeProcess(deltaRelation, >>collector, >> > coordinator); >> > - if (rel == null) { >> > - return; >> > - } >> > - this.realProcess(rel, getCollector(collector, coordinator), >> > coordinator); >> > - } >> > - >> > - /** >> > - * This method is made final s.t. the sequence of invocations >>between >> > {@link >> > >>org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple, >> > MessageCollector, TaskCoordinator)} >> > - * and real processing of the input tuple is fixed. >> > - */ >> > - @Override >> > - final public void process(Tuple tuple, MessageCollector collector, >> > TaskCoordinator coordinator) throws Exception { >> > - Tuple ituple = this.callback.beforeProcess(tuple, collector, >> > coordinator); >> > - if (ituple == null) { >> > - return; >> > - } >> > - this.realProcess(ituple, getCollector(collector, coordinator), >> > coordinator); >> > - } >> > - >> > - /** >> > - * This method is made final s.t. we enforce the invocation of >>{@code >> > SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)} >> before >> > doing anything futher >> > - */ >> > - @Override >> > - final public void refresh(long timeNano, MessageCollector >>collector, >> > TaskCoordinator coordinator) throws Exception { >> > - this.realRefresh(timeNano, getCollector(collector, coordinator), >> > coordinator); >> > - } >> > - >> > - private SimpleMessageCollector getCollector(MessageCollector >> collector, >> > TaskCoordinator coordinator) { >> > - if (!(collector instanceof SimpleMessageCollector)) { >> > - return new SimpleMessageCollector(collector, coordinator, >> > this.callback); >> > - } else { >> > - ((SimpleMessageCollector) >> > collector).switchOperatorCallback(this.callback); >> > - return (SimpleMessageCollector) collector; >> > - } >> > - } >> > - >> > - /** >> > - * Method to be overriden by each specific implementation class of >> > operator to handle timeout event >> > - * >> > - * @param timeNano The time in nanosecond when the timeout event >> > occurred >> > - * @param collector The {@link >> > org.apache.samza.task.sql.SimpleMessageCollector} in the context >> > - * @param coordinator The {@link >> org.apache.samza.task.TaskCoordinator} >> > in the context >> > - * @throws Exception Throws exception if failed to refresh the >>results >> > - */ >> > - protected abstract void realRefresh(long timeNano, >> > SimpleMessageCollector collector, TaskCoordinator coordinator) >> > - throws Exception; >> > - >> > - /** >> > - * Method to be overriden by each specific implementation class of >> > operator to perform relational logic operation on an input {@link >> > org.apache.samza.sql.api.data.Relation} >> > - * >> > - * @param rel The input relation >> > - * @param collector The {@link >> > org.apache.samza.task.sql.SimpleMessageCollector} in the context >> > - * @param coordinator The {@link >> org.apache.samza.task.TaskCoordinator} >> > in the context >> > - * @throws Exception >> > - */ >> > - protected abstract void realProcess(Relation rel, >> > SimpleMessageCollector collector, TaskCoordinator coordinator) >> > - throws Exception; >> > - >> > - protected abstract void realProcess(Tuple ituple, >> > SimpleMessageCollector collector, TaskCoordinator coordinator) >> > - throws Exception; >> > - >> > -} >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec. >>java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim >>pleOperatorSpec.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim >>pleOperatorSpec.java >> > deleted file mode 100644 >> > index 56753b6..0000000 >> > --- >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim >>pleOperatorSpec.java >> > +++ /dev/null >> > @@ -1,106 +0,0 @@ >> > -/* >> > - * Licensed to the Apache Software Foundation (ASF) under one >> > - * or more contributor license agreements. See the NOTICE file >> > - * distributed with this work for additional information >> > - * regarding copyright ownership. The ASF licenses this file >> > - * to you under the Apache License, Version 2.0 (the >> > - * "License"); you may not use this file except in compliance >> > - * with the License. You may obtain a copy of the License at >> > - * >> > - * http://www.apache.org/licenses/LICENSE-2.0 >> > - * >> > - * Unless required by applicable law or agreed to in writing, >> > - * software distributed under the License is distributed on an >> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >> > - * KIND, either express or implied. See the License for the >> > - * specific language governing permissions and limitations >> > - * under the License. >> > - */ >> > -package org.apache.samza.sql.operators.factory; >> > - >> > -import java.util.ArrayList; >> > -import java.util.List; >> > - >> > -import org.apache.samza.sql.api.data.EntityName; >> > -import org.apache.samza.sql.api.operators.OperatorSpec; >> > - >> > - >> > -/** >> > - * An abstract class that encapsulate the basic information and >>methods >> > that all specification of operators should implement. >> > - * It implements {@link >>org.apache.samza.sql.api.operators.OperatorSpec} >> > - */ >> > -public abstract class SimpleOperatorSpec implements OperatorSpec { >> > - /** >> > - * The identifier of the corresponding operator >> > - */ >> > - private final String id; >> > - >> > - /** >> > - * The list of input entity names of the corresponding operator >> > - */ >> > - private final List<EntityName> inputs = new >>ArrayList<EntityName>(); >> > - >> > - /** >> > - * The list of output entity names of the corresponding operator >> > - */ >> > - private final List<EntityName> outputs = new >>ArrayList<EntityName>(); >> > - >> > - /** >> > - * Ctor of the {@code SimpleOperatorSpec} for simple {@link >> > org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and >>one >> > output >> > - * >> > - * @param id Unique identifier of the {@link >> > org.apache.samza.sql.api.operators.SimpleOperator} object >> > - * @param input The only input entity >> > - * @param output The only output entity >> > - */ >> > - public SimpleOperatorSpec(String id, EntityName input, EntityName >> > output) { >> > - this.id = id; >> > - this.inputs.add(input); >> > - this.outputs.add(output); >> > - } >> > - >> > - /** >> > - * Ctor of {@code SimpleOperatorSpec} with general format: m inputs >> and >> > n outputs >> > - * >> > - * @param id Unique identifier of the {@link >> > org.apache.samza.sql.api.operators.SimpleOperator} object >> > - * @param inputs The list of input entities >> > - * @param output The list of output entities >> > - */ >> > - public SimpleOperatorSpec(String id, List<EntityName> inputs, >> > EntityName output) { >> > - this.id = id; >> > - this.inputs.addAll(inputs); >> > - this.outputs.add(output); >> > - } >> > - >> > - @Override >> > - public String getId() { >> > - return this.id; >> > - } >> > - >> > - @Override >> > - public List<EntityName> getInputNames() { >> > - return this.inputs; >> > - } >> > - >> > - @Override >> > - public List<EntityName> getOutputNames() { >> > - return this.outputs; >> > - } >> > - >> > - /** >> > - * Method to get the first output entity >> > - * >> > - * @return The first output entity name >> > - */ >> > - public EntityName getOutputName() { >> > - return this.outputs.get(0); >> > - } >> > - >> > - /** >> > - * Method to get the first input entity >> > - * >> > - * @return The first input entity name >> > - */ >> > - public EntityName getInputName() { >> > - return this.inputs.get(0); >> > - } >> > -} >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim >>pleRouter.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim >>pleRouter.java >> > deleted file mode 100644 >> > index e570897..0000000 >> > --- >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim >>pleRouter.java >> > +++ /dev/null >> > @@ -1,136 +0,0 @@ >> > -/* >> > - * Licensed to the Apache Software Foundation (ASF) under one >> > - * or more contributor license agreements. See the NOTICE file >> > - * distributed with this work for additional information >> > - * regarding copyright ownership. The ASF licenses this file >> > - * to you under the Apache License, Version 2.0 (the >> > - * "License"); you may not use this file except in compliance >> > - * with the License. You may obtain a copy of the License at >> > - * >> > - * http://www.apache.org/licenses/LICENSE-2.0 >> > - * >> > - * Unless required by applicable law or agreed to in writing, >> > - * software distributed under the License is distributed on an >> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >> > - * KIND, either express or implied. See the License for the >> > - * specific language governing permissions and limitations >> > - * under the License. >> > - */ >> > - >> > -package org.apache.samza.sql.operators.factory; >> > - >> > -import java.util.ArrayList; >> > -import java.util.HashMap; >> > -import java.util.HashSet; >> > -import java.util.Iterator; >> > -import java.util.List; >> > -import java.util.Map; >> > -import java.util.Set; >> > - >> > -import org.apache.samza.config.Config; >> > -import org.apache.samza.sql.api.data.EntityName; >> > -import org.apache.samza.sql.api.data.Relation; >> > -import org.apache.samza.sql.api.data.Tuple; >> > -import org.apache.samza.sql.api.operators.Operator; >> > -import org.apache.samza.sql.api.operators.OperatorRouter; >> > -import org.apache.samza.sql.api.operators.SimpleOperator; >> > -import org.apache.samza.task.MessageCollector; >> > -import org.apache.samza.task.TaskContext; >> > -import org.apache.samza.task.TaskCoordinator; >> > -import org.apache.samza.task.sql.RouterMessageCollector; >> > - >> > - >> > -/** >> > - * Example implementation of {@link >> > org.apache.samza.sql.api.operators.OperatorRouter} >> > - * >> > - */ >> > -public final class SimpleRouter implements OperatorRouter { >> > - /** >> > - * List of operators added to the {@link >> > org.apache.samza.sql.api.operators.OperatorRouter} >> > - */ >> > - private List<SimpleOperator> operators = new >> > ArrayList<SimpleOperator>(); >> > - >> > - @SuppressWarnings("rawtypes") >> > - /** >> > - * Map of {@link org.apache.samza.sql.api.data.EntityName} to the >>list >> > of operators associated with it >> > - */ >> > - private Map<EntityName, List> nextOps = new HashMap<EntityName, >> List>(); >> > - >> > - /** >> > - * Set of {@link org.apache.samza.sql.api.data.EntityName} as >>inputs >> to >> > this {@code SimpleRouter} >> > - */ >> > - private Set<EntityName> inputEntities = new HashSet<EntityName>(); >> > - >> > - /** >> > - * Set of entities that are not input entities to this {@code >> > SimpleRouter} >> > - */ >> > - private Set<EntityName> outputEntities = new HashSet<EntityName>(); >> > - >> > - @SuppressWarnings("unchecked") >> > - private void addOperator(EntityName input, SimpleOperator nextOp) { >> > - if (nextOps.get(input) == null) { >> > - nextOps.put(input, new ArrayList<Operator>()); >> > - } >> > - nextOps.get(input).add(nextOp); >> > - operators.add(nextOp); >> > - // get the operator spec >> > - for (EntityName output : nextOp.getSpec().getOutputNames()) { >> > - if (inputEntities.contains(output)) { >> > - inputEntities.remove(output); >> > - } >> > - outputEntities.add(output); >> > - } >> > - if (!outputEntities.contains(input)) { >> > - inputEntities.add(input); >> > - } >> > - } >> > - >> > - @Override >> > - @SuppressWarnings("unchecked") >> > - public List<SimpleOperator> getNextOperators(EntityName entity) { >> > - return nextOps.get(entity); >> > - } >> > - >> > - @Override >> > - public void addOperator(SimpleOperator nextOp) { >> > - List<EntityName> inputs = nextOp.getSpec().getInputNames(); >> > - for (EntityName input : inputs) { >> > - addOperator(input, nextOp); >> > - } >> > - } >> > - >> > - @Override >> > - public void init(Config config, TaskContext context) throws >>Exception >> { >> > - for (SimpleOperator op : this.operators) { >> > - op.init(config, context); >> > - } >> > - } >> > - >> > - @Override >> > - public void process(Tuple ituple, MessageCollector collector, >> > TaskCoordinator coordinator) throws Exception { >> > - MessageCollector opCollector = new >>RouterMessageCollector(collector, >> > coordinator, this); >> > - for (Iterator<SimpleOperator> iter = >> > this.getNextOperators(ituple.getEntityName()).iterator(); >> iter.hasNext();) { >> > - iter.next().process(ituple, opCollector, coordinator); >> > - } >> > - } >> > - >> > - @SuppressWarnings("rawtypes") >> > - @Override >> > - public void process(Relation deltaRelation, MessageCollector >> collector, >> > TaskCoordinator coordinator) throws Exception { >> > - MessageCollector opCollector = new >>RouterMessageCollector(collector, >> > coordinator, this); >> > - for (Iterator<SimpleOperator> iter = >> > this.getNextOperators(deltaRelation.getName()).iterator(); >> iter.hasNext();) >> > { >> > - iter.next().process(deltaRelation, opCollector, coordinator); >> > - } >> > - } >> > - >> > - @Override >> > - public void refresh(long nanoSec, MessageCollector collector, >> > TaskCoordinator coordinator) throws Exception { >> > - MessageCollector opCollector = new >>RouterMessageCollector(collector, >> > coordinator, this); >> > - for (EntityName entity : inputEntities) { >> > - for (Iterator<SimpleOperator> iter = >> > this.getNextOperators(entity).iterator(); iter.hasNext();) { >> > - iter.next().refresh(nanoSec, opCollector, coordinator); >> > - } >> > - } >> > - } >> > - >> > -} >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.jav >>a >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Top >>ologyBuilder.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Top >>ologyBuilder.java >> > new file mode 100644 >> > index 0000000..62b19fc >> > --- /dev/null >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Top >>ologyBuilder.java >> > @@ -0,0 +1,284 @@ >> > +/* >> > + * Licensed to the Apache Software Foundation (ASF) under one >> > + * or more contributor license agreements. See the NOTICE file >> > + * distributed with this work for additional information >> > + * regarding copyright ownership. The ASF licenses this file >> > + * to you under the Apache License, Version 2.0 (the >> > + * "License"); you may not use this file except in compliance >> > + * with the License. You may obtain a copy of the License at >> > + * >> > + * http://www.apache.org/licenses/LICENSE-2.0 >> > + * >> > + * Unless required by applicable law or agreed to in writing, >> > + * software distributed under the License is distributed on an >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >> > + * KIND, either express or implied. See the License for the >> > + * specific language governing permissions and limitations >> > + * under the License. >> > + */ >> > +package org.apache.samza.sql.operators.factory; >> > + >> > +import java.util.HashMap; >> > +import java.util.HashSet; >> > +import java.util.Iterator; >> > +import java.util.List; >> > +import java.util.Map; >> > +import java.util.Set; >> > + >> > +import org.apache.samza.sql.api.data.EntityName; >> > +import org.apache.samza.sql.api.operators.OperatorRouter; >> > +import org.apache.samza.sql.api.operators.OperatorSink; >> > +import org.apache.samza.sql.api.operators.OperatorSource; >> > +import org.apache.samza.sql.api.operators.OperatorSpec; >> > +import org.apache.samza.sql.api.operators.SimpleOperator; >> > +import org.apache.samza.sql.api.operators.SqlOperatorFactory; >> > +import org.apache.samza.sql.operators.OperatorTopology; >> > +import org.apache.samza.sql.operators.SimpleRouter; >> > + >> > + >> > +/** >> > + * This class implements a builder to allow user to create the >>operators >> > and connect them in a topology altogether. >> > + */ >> > +public class TopologyBuilder { >> > + >> > + /** >> > + * Internal {@link >>org.apache.samza.sql.api.operators.OperatorRouter} >> > object to retain the topology being created >> > + */ >> > + private SimpleRouter router; >> > + >> > + /** >> > + * The {@link >>org.apache.samza.sql.api.operators.SqlOperatorFactory} >> > object used to create operators connected in the topology >> > + */ >> > + private final SqlOperatorFactory factory; >> > + >> > + /** >> > + * The map of unbound inputs, the value is set(input_operators) >> > + */ >> > + private Map<EntityName, Set<OperatorSpec>> unboundInputs = new >> > HashMap<EntityName, Set<OperatorSpec>>(); >> > + >> > + /** >> > + * The map of unbound outputs, the value is the operator generating >> the >> > output >> > + */ >> > + private Map<EntityName, OperatorSpec> unboundOutputs = new >> > HashMap<EntityName, OperatorSpec>(); >> > + >> > + /** >> > + * The set of entities that are intermediate entities between >> operators >> > + */ >> > + private Set<EntityName> interStreams = new HashSet<EntityName>(); >> > + >> > + /** >> > + * The current operator that may have unbound input or output >> > + */ >> > + private SimpleOperator currentOp = null; >> > + >> > + /** >> > + * Private constructor of {@code TopologyBuilder} >> > + * >> > + * @param factory The {@link >> > org.apache.samza.sql.api.operators.SqlOperatorFactory} to create >> operators >> > + */ >> > + private TopologyBuilder(SqlOperatorFactory factory) { >> > + this.router = new SimpleRouter(); >> > + this.factory = factory; >> > + } >> > + >> > + /** >> > + * Static method to create this {@code TopologyBuilder} w/ a >> customized >> > {@link org.apache.samza.sql.api.operators.SqlOperatorFactory} >> > + * >> > + * @param factory The {@link >> > org.apache.samza.sql.api.operators.SqlOperatorFactory} to create >> operators >> > + * @return The {@code TopologyBuilder} object >> > + */ >> > + public static TopologyBuilder create(SqlOperatorFactory factory) { >> > + return new TopologyBuilder(factory); >> > + } >> > + >> > + /** >> > + * Static method to create this {@code TopologyBuilder} >> > + * >> > + * @return The {@code TopologyBuilder} object >> > + */ >> > + public static TopologyBuilder create() { >> > + return new TopologyBuilder(new SimpleOperatorFactoryImpl()); >> > + } >> > + >> > + /** >> > + * Public method to create the next operator and attach it to the >> > output of the current operator >> > + * >> > + * @param spec The {@link >> > org.apache.samza.sql.api.operators.OperatorSpec} for the next operator >> > + * @return The updated {@code TopologyBuilder} object >> > + */ >> > + public TopologyBuilder operator(OperatorSpec spec) { >> > + // check whether it is valid to connect a new operator to the >> current >> > operator's output >> > + SimpleOperator nextOp = this.factory.getOperator(spec); >> > + return this.operator(nextOp); >> > + } >> > + >> > + /** >> > + * Public method to create the next operator and attach it to the >> > output of the current operator >> > + * >> > + * @param op The {@link >> > org.apache.samza.sql.api.operators.SimpleOperator} >> > + * @return The updated {@code TopologyBuilder} object >> > + */ >> > + public TopologyBuilder operator(SimpleOperator op) { >> > + // check whether it is valid to connect a new operator to the >> current >> > operator's output >> > + canAddOperator(op); >> > + this.addOperator(op); >> > + // advance the current operator position >> > + this.currentOp = op; >> > + return this; >> > + } >> > + >> > + /** >> > + * Public method to create a stream object that will be the source >>to >> > other operators >> > + * >> > + * @return The {@link >> > org.apache.samza.sql.api.operators.OperatorSource} that can be the >>source >> > to other operators >> > + */ >> > + public OperatorSource stream() { >> > + canCreateSource(); >> > + return new >> > OperatorTopology(this.unboundOutputs.keySet().iterator().next(), >> > this.router); >> > + } >> > + >> > + /** >> > + * Public method to create a sink object that can take input stream >> > from other operators >> > + * >> > + * @return The {@link >>org.apache.samza.sql.api.operators.OperatorSink} >> > that can be the downstream of other operators >> > + */ >> > + public OperatorSink sink() { >> > + canCreateSink(); >> > + return new >> > OperatorTopology(this.unboundInputs.keySet().iterator().next(), >> > this.router); >> > + } >> > + >> > + /** >> > + * Public method to bind the input of the current operator w/ the >> > {@link org.apache.samza.sql.api.operators.OperatorSource} object >> > + * >> > + * @param srcStream The {@link >> > org.apache.samza.sql.api.operators.OperatorSource} that the current >> > operator is going to be bound to >> > + * @return The updated {@code TopologyBuilder} object >> > + */ >> > + public TopologyBuilder bind(OperatorSource srcStream) { >> > + EntityName streamName = srcStream.getName(); >> > + if (this.unboundInputs.containsKey(streamName)) { >> > + this.unboundInputs.remove(streamName); >> > + this.interStreams.add(streamName); >> > + } else { >> > + // no input operator is waiting for the output from the >>srcStream >> > + throw new IllegalArgumentException("No operator input can be >>bound >> > to the input stream " + streamName); >> > + } >> > + // add all operators in srcStream to this topology >> > + for (Iterator<SimpleOperator> iter = srcStream.opIterator(); >> > iter.hasNext();) { >> > + this.addOperator(iter.next()); >> > + } >> > + return this; >> > + } >> > + >> > + /** >> > + * Public method to attach a {@link >> > org.apache.samza.sql.api.operators.OperatorSink} object to the output >>of >> > the current operator >> > + * >> > + * @param nextSink The {@link >> > org.apache.samza.sql.api.operators.OperatorSink} to be attached to the >> > current operator's output >> > + * @return The updated {@code TopologyBuilder} object >> > + */ >> > + public TopologyBuilder attach(OperatorSink nextSink) { >> > + EntityName streamName = nextSink.getName(); >> > + if (this.unboundOutputs.containsKey(streamName)) { >> > + this.unboundOutputs.remove(streamName); >> > + this.interStreams.add(streamName); >> > + } else { >> > + // no unbound output to attach to >> > + throw new IllegalArgumentException("No operator output found to >> > attach the sink " + streamName); >> > + } >> > + // add all operators in nextSink to the router >> > + for (Iterator<SimpleOperator> iter = nextSink.opIterator(); >> > iter.hasNext();) { >> > + this.addOperator(iter.next()); >> > + } >> > + return this; >> > + } >> > + >> > + /** >> > + * Public method to finalize the topology that should have all >>input >> > and output bound to system input and output >> > + * >> > + * @return The finalized {@link >> > org.apache.samza.sql.api.operators.OperatorRouter} object >> > + */ >> > + public OperatorRouter build() { >> > + canClose(); >> > + return router; >> > + } >> > + >> > + private TopologyBuilder addOperator(SimpleOperator nextOp) { >> > + // if input is not in the unboundOutputs and interStreams, input >>is >> > unbound >> > + for (EntityName in : nextOp.getSpec().getInputNames()) { >> > + if (this.unboundOutputs.containsKey(in)) { >> > + this.unboundOutputs.remove(in); >> > + this.interStreams.add(in); >> > + } >> > + if (!this.interStreams.contains(in) && !in.isSystemEntity()) { >> > + if (!this.unboundInputs.containsKey(in)) { >> > + this.unboundInputs.put(in, new HashSet<OperatorSpec>()); >> > + } >> > + this.unboundInputs.get(in).add(nextOp.getSpec()); >> > + } >> > + } >> > + // if output is not in the unboundInputs and interStreams, >>output is >> > unbound >> > + for (EntityName out : nextOp.getSpec().getOutputNames()) { >> > + if (this.unboundInputs.containsKey(out)) { >> > + this.unboundInputs.remove(out); >> > + this.interStreams.add(out); >> > + } >> > + if (!this.interStreams.contains(out) && !out.isSystemEntity()) >>{ >> > + this.unboundOutputs.put(out, nextOp.getSpec()); >> > + } >> > + } >> > + try { >> > + this.router.addOperator(nextOp); >> > + } catch (Exception e) { >> > + throw new RuntimeException("Failed to add operator " + >> > nextOp.getSpec().getId() + " to the topology.", e); >> > + } >> > + return this; >> > + } >> > + >> > + private void canCreateSource() { >> > + if (this.unboundInputs.size() > 0) { >> > + throw new IllegalStateException("Can't create stream when there >> are >> > unbounded input streams in the topology"); >> > + } >> > + if (this.unboundOutputs.size() != 1) { >> > + throw new IllegalStateException( >> > + "Can't create stream when the number of unbounded outputs >>is >> > not 1 in the topology"); >> > + } >> > + } >> > + >> > + private void canCreateSink() { >> > + if (this.unboundOutputs.size() > 0) { >> > + throw new IllegalStateException("Can't create sink when there >>are >> > unbounded output streams in the topology"); >> > + } >> > + if (this.unboundInputs.size() != 1) { >> > + throw new IllegalStateException( >> > + "Can't create sink when the number of unbounded input >>streams >> > is not 1 in the topology"); >> > + } >> > + } >> > + >> > + private void canAddOperator(SimpleOperator op) { >> > + if (this.currentOp == null) { >> > + return; >> > + } >> > + for (EntityName name : this.currentOp.getSpec().getInputNames()) >>{ >> > + if (this.unboundInputs.containsKey(name)) { >> > + throw new IllegalArgumentException("There are unbound input >>" + >> > name + " to the current operator " >> > + + this.currentOp.getSpec().getId() + ". Create a sink or >> call >> > bind instead"); >> > + } >> > + } >> > + List<EntityName> nextInputs = op.getSpec().getInputNames(); >> > + for (EntityName name : >>this.currentOp.getSpec().getOutputNames()) { >> > + if (!nextInputs.contains(name) && >> > this.unboundOutputs.containsKey(name)) { >> > + // the current operator's output is not in the next >>operator's >> > input list >> > + throw new IllegalArgumentException("There are unbound output >>" + >> > name + " from the current operator " >> > + + this.currentOp.getSpec().getId() >> > + + " that are not included in the next operator's inputs. >> > Create a stream or call attach instead"); >> > + } >> > + } >> > + } >> > + >> > + private void canClose() { >> > + if (!this.unboundInputs.isEmpty() || >> !this.unboundOutputs.isEmpty()) { >> > + throw new IllegalStateException( >> > + "There are input/output streams in the topology that are >>not >> > bounded. Can't build the topology yet."); >> > + } >> > + } >> > + >> > +} >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream >>StreamJoin.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream >>StreamJoin.java >> > index 2854aeb..7f5b990 100644 >> > --- >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream >>StreamJoin.java >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream >>StreamJoin.java >> > @@ -29,7 +29,7 @@ import org.apache.samza.sql.api.data.Relation; >> > import org.apache.samza.sql.api.data.Stream; >> > import org.apache.samza.sql.api.data.Tuple; >> > import org.apache.samza.sql.api.operators.OperatorCallback; >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl; >> > +import org.apache.samza.sql.operators.SimpleOperatorImpl; >> > import org.apache.samza.sql.operators.window.BoundedTimeWindow; >> > import org.apache.samza.sql.window.storage.OrderedStoreKey; >> > import org.apache.samza.storage.kv.Entry; >> > @@ -38,7 +38,6 @@ import org.apache.samza.task.TaskContext; >> > import org.apache.samza.task.TaskCoordinator; >> > import org.apache.samza.task.sql.SimpleMessageCollector; >> > >> > - >> > /** >> > * This class implements a simple stream-to-stream join >> > */ >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.j >>ava >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream >>StreamJoinSpec.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream >>StreamJoinSpec.java >> > index cc0aca0..eecff7e 100644 >> > --- >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream >>StreamJoinSpec.java >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream >>StreamJoinSpec.java >> > @@ -19,10 +19,11 @@ >> > >> > package org.apache.samza.sql.operators.join; >> > >> > +import java.util.ArrayList; >> > import java.util.List; >> > >> > import org.apache.samza.sql.api.data.EntityName; >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; >> > +import org.apache.samza.sql.operators.SimpleOperatorSpec; >> > >> > >> > /** >> > @@ -35,4 +36,16 @@ public class StreamStreamJoinSpec extends >> > SimpleOperatorSpec { >> > // TODO Auto-generated constructor stub >> > } >> > >> > + @SuppressWarnings("serial") >> > + public StreamStreamJoinSpec(String id, List<String> inputRelations, >> > String output, List<String> joinKeys) { >> > + super(id, new ArrayList<EntityName>() { >> > + { >> > + for (String input : inputRelations) { >> > + add(EntityName.getStreamName(input)); >> > + } >> > + } >> > + }, EntityName.getStreamName(output)); >> > + // TODO Auto-generated constructor stub >> > + } >> > + >> > } >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P >>artitionOp.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P >>artitionOp.java >> > index b93d789..0cba39a 100644 >> > --- >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P >>artitionOp.java >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P >>artitionOp.java >> > @@ -23,7 +23,7 @@ import org.apache.samza.config.Config; >> > import org.apache.samza.sql.api.data.Relation; >> > import org.apache.samza.sql.api.data.Tuple; >> > import org.apache.samza.sql.api.operators.OperatorCallback; >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl; >> > +import org.apache.samza.sql.operators.SimpleOperatorImpl; >> > import org.apache.samza.storage.kv.Entry; >> > import org.apache.samza.storage.kv.KeyValueIterator; >> > import org.apache.samza.system.OutgoingMessageEnvelope; >> > @@ -32,7 +32,6 @@ import org.apache.samza.task.TaskContext; >> > import org.apache.samza.task.TaskCoordinator; >> > import org.apache.samza.task.sql.SimpleMessageCollector; >> > >> > - >> > /** >> > * This is an example build-in operator that performs a simple stream >> > re-partition operation. >> > * >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.jav >>a >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P >>artitionSpec.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P >>artitionSpec.java >> > index c47eed9..e494bff 100644 >> > --- >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P >>artitionSpec.java >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P >>artitionSpec.java >> > @@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.partition; >> > >> > import org.apache.samza.sql.api.data.EntityName; >> > import org.apache.samza.sql.api.operators.OperatorSpec; >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; >> > +import org.apache.samza.sql.operators.SimpleOperatorSpec; >> > import org.apache.samza.system.SystemStream; >> > >> > >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.ja >>va >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Boun >>dedTimeWindow.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Boun >>dedTimeWindow.java >> > index d81cc93..a9a83b5 100644 >> > --- >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Boun >>dedTimeWindow.java >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Boun >>dedTimeWindow.java >> > @@ -27,13 +27,12 @@ import org.apache.samza.sql.api.data.EntityName; >> > import org.apache.samza.sql.api.data.Relation; >> > import org.apache.samza.sql.api.data.Tuple; >> > import org.apache.samza.sql.api.operators.OperatorCallback; >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl; >> > +import org.apache.samza.sql.operators.SimpleOperatorImpl; >> > import org.apache.samza.storage.kv.KeyValueIterator; >> > import org.apache.samza.task.TaskContext; >> > import org.apache.samza.task.TaskCoordinator; >> > import org.apache.samza.task.sql.SimpleMessageCollector; >> > >> > - >> > /** >> > * This class defines an example build-in operator for a fixed size >> > window operator that converts a stream to a relation >> > * >> > @@ -86,6 +85,7 @@ public class BoundedTimeWindow extends >> > SimpleOperatorImpl { >> > * @param lengthSec The window size in seconds >> > * @param input The input stream name >> > * @param output The output relation name >> > + * @param callback The user callback object >> > */ >> > public BoundedTimeWindow(String wndId, int lengthSec, String input, >> > String output, OperatorCallback callback) { >> > super(new WindowSpec(wndId, EntityName.getStreamName(input), >> > EntityName.getStreamName(output), lengthSec), callback); >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Wind >>owSpec.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Wind >>owSpec.java >> > index eec32ea..6c4eba8 100644 >> > --- >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Wind >>owSpec.java >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Wind >>owSpec.java >> > @@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.window; >> > >> > import org.apache.samza.sql.api.data.EntityName; >> > import org.apache.samza.sql.api.operators.OperatorSpec; >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; >> > +import org.apache.samza.sql.operators.SimpleOperatorSpec; >> > >> > >> > /** >> > @@ -47,6 +47,11 @@ public class WindowSpec extends SimpleOperatorSpec >> > implements OperatorSpec { >> > this.wndSizeSec = lengthSec; >> > } >> > >> > + public WindowSpec(String id, int wndSize, String input) { >> > + super(id, EntityName.getStreamName(input), null); >> > + this.wndSizeSec = wndSize; >> > + } >> > + >> > /** >> > * Method to get the window state relation name >> > * >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCol >>lector.java >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCol >>lector.java >> > index b29838a..6950f67 100644 >> > --- >> > >> >>a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCol >>lector.java >> > +++ >> > >> >>b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCol >>lector.java >> > @@ -22,7 +22,7 @@ package org.apache.samza.task.sql; >> > import org.apache.samza.sql.api.data.Relation; >> > import org.apache.samza.sql.api.data.Tuple; >> > import org.apache.samza.sql.api.operators.OperatorCallback; >> > -import org.apache.samza.sql.operators.factory.NoopOperatorCallback; >> > +import org.apache.samza.sql.operators.NoopOperatorCallback; >> > import org.apache.samza.storage.kv.Entry; >> > import org.apache.samza.storage.kv.KeyValueIterator; >> > import org.apache.samza.system.OutgoingMessageEnvelope; >> > @@ -57,25 +57,38 @@ public class SimpleMessageCollector implements >> > MessageCollector { >> > * @param coordinator The {@link >> org.apache.samza.task.TaskCoordinator} >> > in the context >> > */ >> > public SimpleMessageCollector(MessageCollector collector, >> > TaskCoordinator coordinator) { >> > - this.collector = collector; >> > - this.coordinator = coordinator; >> > + this(collector, coordinator, new NoopOperatorCallback()); >> > } >> > >> > /** >> > * This method swaps the {@code callback} with the new one >> > * >> > - * <p> This method allows the {@link >> > org.apache.samza.sql.api.operators.SimpleOperator} to be swapped when >>the >> > collector >> > - * is passed down into the next operator's context. Hence, under >>the >> > new operator's context, the correct {@link >> > >> >>org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Relation >>, >> > MessageCollector, TaskCoordinator)}, >> > - * and {@link >> > >>org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Tuple, >> > MessageCollector, TaskCoordinator)} can be invoked >> > + * <p> This method allows the {@link >> > org.apache.samza.sql.api.operators.OperatorCallback} to be swapped >>when >> the >> > collector >> > + * is passed down into the next operator's context. Hence, under >>the >> > new operator's context, the correct callback functions can be invoked >> > * >> > * @param callback The new {@link >> > org.apache.samza.sql.api.operators.OperatorCallback} to be set >> > */ >> > - public void switchOperatorCallback(OperatorCallback callback) { >> > - this.callback = callback; >> > + public void switchCallback(OperatorCallback callback) { >> > + if (callback == null) { >> > + this.callback = new NoopOperatorCallback(); >> > + } else { >> > + this.callback = callback; >> > + } >> > + } >> > + >> > + /** >> > + * Method is declared to be final s.t. we enforce that the callback >> > functions are called first >> > + */ >> > + @Override >> > + final public void send(OutgoingMessageEnvelope envelope) { >> > + this.collector.send(envelope); >> > } >> > >> > /** >> > * Method is declared to be final s.t. we enforce that the callback >> > functions are called first >> > + * >> > + * @param deltaRelation The relation to be sent out >> > + * @throws Exception Throws exception if failed to send >> > */ >> > final public void send(Relation deltaRelation) throws Exception { >> > Relation rel = this.callback.afterProcess(deltaRelation, >>collector, >> > coordinator); >> > @@ -87,6 +100,9 @@ public class SimpleMessageCollector implements >> > MessageCollector { >> > >> > /** >> > * Method is declared to be final s.t. we enforce that the callback >> > functions are called first >> > + * >> > + * @param tuple The tuple to be sent out >> > + * @throws Exception Throws exception if failed to send >> > */ >> > final public void send(Tuple tuple) throws Exception { >> > Tuple otuple = this.callback.afterProcess(tuple, collector, >> > coordinator); >> > @@ -106,9 +122,4 @@ public class SimpleMessageCollector implements >> > MessageCollector { >> > protected void realSend(Tuple tuple) throws Exception { >> > this.collector.send((OutgoingMessageEnvelope) >>tuple.getMessage()); >> > } >> > - >> > - @Override >> > - public void send(OutgoingMessageEnvelope envelope) { >> > - this.collector.send(envelope); >> > - } >> > } >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOper >>atorTask.java >> > >> >>b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOper >>atorTask.java >> > index 20dc701..7370af6 100644 >> > --- >> > >> >>a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOper >>atorTask.java >> > +++ >> > >> >>b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOper >>atorTask.java >> > @@ -22,6 +22,7 @@ package org.apache.samza.task.sql; >> > import org.apache.samza.config.Config; >> > import org.apache.samza.sql.api.data.Relation; >> > import org.apache.samza.sql.api.data.Tuple; >> > +import org.apache.samza.sql.api.operators.Operator; >> > import org.apache.samza.sql.api.operators.OperatorCallback; >> > import org.apache.samza.sql.data.IncomingMessageTuple; >> > import org.apache.samza.sql.operators.window.BoundedTimeWindow; >> > @@ -39,7 +40,7 @@ import org.apache.samza.task.WindowableTask; >> > * >> > */ >> > public class RandomWindowOperatorTask implements StreamTask, >> > InitableTask, WindowableTask { >> > - private BoundedTimeWindow wndOp; >> > + private Operator operator; >> > >> > private final OperatorCallback wndCallback = new >>OperatorCallback() { >> > >> > @@ -77,20 +78,20 @@ public class RandomWindowOperatorTask implements >> > StreamTask, InitableTask, Windo >> > public void process(IncomingMessageEnvelope envelope, >>MessageCollector >> > collector, TaskCoordinator coordinator) >> > throws Exception { >> > // based on tuple's stream name, get the window op and run >>process() >> > - wndOp.process(new IncomingMessageTuple(envelope), collector, >> > coordinator); >> > + operator.process(new IncomingMessageTuple(envelope), collector, >> > coordinator); >> > >> > } >> > >> > @Override >> > public void window(MessageCollector collector, TaskCoordinator >> > coordinator) throws Exception { >> > // based on tuple's stream name, get the window op and run >>process() >> > - wndOp.refresh(System.nanoTime(), collector, coordinator); >> > + operator.refresh(System.nanoTime(), collector, coordinator); >> > } >> > >> > @Override >> > public void init(Config config, TaskContext context) throws >>Exception >> { >> > // 1. create a fixed length 10 sec window operator >> > - this.wndOp = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1", >> > "relation1", this.wndCallback); >> > - this.wndOp.init(config, context); >> > + this.operator = new BoundedTimeWindow("wndOp1", 10, >>"kafka:stream1", >> > "wndOutput", this.wndCallback); >> > + this.operator.init(config, context); >> > } >> > } >> > >> > >> > >> >>http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core >>/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java >> > ---------------------------------------------------------------------- >> > diff --git >> > >> >>a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.ja >>va >> > >> >>b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.ja >>va >> > index 9124e3c..d65892c 100644 >> > --- >> > >> >>a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.ja >>va >> > +++ >> > >> >>b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.ja >>va >> > @@ -24,7 +24,7 @@ import java.util.List; >> > >> > import org.apache.samza.config.Config; >> > import org.apache.samza.sql.data.IncomingMessageTuple; >> > -import org.apache.samza.sql.operators.factory.SimpleRouter; >> > +import org.apache.samza.sql.operators.SimpleRouter; >> > import org.apache.samza.sql.operators.join.StreamStreamJoin; >> > import org.apache.samza.sql.operators.partition.PartitionOp; >> > import org.apache.samza.sql.operators.window.BoundedTimeWindow; >> > @@ -51,25 +51,25 @@ import org.apache.samza.task.WindowableTask; >> > */ >> > public class StreamSqlTask implements StreamTask, InitableTask, >> > WindowableTask { >> > >> > - private SimpleRouter rteCntx; >> > + private SimpleRouter router; >> > >> > @Override >> > public void process(IncomingMessageEnvelope envelope, >>MessageCollector >> > collector, TaskCoordinator coordinator) >> > throws Exception { >> > - this.rteCntx.process(new IncomingMessageTuple(envelope), >>collector, >> > coordinator); >> > + this.router.process(new IncomingMessageTuple(envelope), >>collector, >> > coordinator); >> > } >> > >> > @Override >> > public void window(MessageCollector collector, TaskCoordinator >> > coordinator) throws Exception { >> > - this.rteCntx.refresh(System.nanoTime(), collector, coordinator); >> > + this.router.refresh(System.nanoTime(), collector, coordinator); >> > } >> > >> > @Override >> > public void init(Config config, TaskContext context) throws >>Exception >> { >> > // create all operators via the operator factory >> > // 1. create two window operators >> > - BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10, >> > "inputStream1", "fixedWndOutput1"); >> > - BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10, >> > "inputStream2", "fixedWndOutput2"); >> > + BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10, >> > "kafka:inputStream1", "fixedWndOutput1"); >> > + BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10, >> > "kafka:inputStream2", "fixedWndOutput2"); >> > // 2. create one join operator >> > @SuppressWarnings("serial") >> > List<String> inputRelations = new ArrayList<String>() { >> > @@ -86,19 +86,19 @@ public class StreamSqlTask implements StreamTask, >> > InitableTask, WindowableTask { >> > } >> > }; >> > StreamStreamJoin join = new StreamStreamJoin("joinOp", >> > inputRelations, "joinOutput", joinKeys); >> > - // 4. create a re-partition operator >> > + // 3. create a re-partition operator >> > PartitionOp par = new PartitionOp("parOp1", "joinOutput", >>"kafka", >> > "parOutputStrm1", "joinKey", 50); >> > >> > // Now, connecting the operators via the OperatorRouter >> > - this.rteCntx = new SimpleRouter(); >> > + this.router = new SimpleRouter(); >> > // 1. set two system input operators (i.e. two window operators) >> > - this.rteCntx.addOperator(wnd1); >> > - this.rteCntx.addOperator(wnd2); >> > + this.router.addOperator(wnd1); >> > + this.router.addOperator(wnd2); >> > // 2. connect join operator to both window operators >> > - this.rteCntx.addOperator(join); >> > + this.router.addOperator(join); >> > // 3. connect re-partition operator to the stream operator >> > - this.rteCntx.addOperator(par); >> > + this.router.addOperator(par); >> > >> > - this.rteCntx.init(config, context); >> > + this.router.init(config, context); >> > } >> > } >> > >> > >> >> >> -- >> Milinda Pathirage >> >> PhD Student | Research Assistant >> School of Informatics and Computing | Data to Insight Center >> Indiana University >> >> twitter: milindalakmal >> skype: milinda.pathirage >> blog: http://milinda.pathirage.org >>
