Repository: samza
Updated Branches:
  refs/heads/samza-sql f29c61498 -> fbdd76daa


http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
 
b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
deleted file mode 100644
index 2854aeb..0000000
--- 
a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.join;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Stream;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.OperatorCallback;
-import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
-import org.apache.samza.sql.operators.window.BoundedTimeWindow;
-import org.apache.samza.sql.window.storage.OrderedStoreKey;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueIterator;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.sql.SimpleMessageCollector;
-
-
-/**
- * This class implements a simple stream-to-stream join
- */
-public class StreamStreamJoin extends SimpleOperatorImpl {
-  private final StreamStreamJoinSpec spec;
-
-  private Map<EntityName, BoundedTimeWindow> inputWindows = new 
HashMap<EntityName, BoundedTimeWindow>();
-
-  public StreamStreamJoin(StreamStreamJoinSpec spec) {
-    super(spec);
-    this.spec = spec;
-  }
-
-  //TODO: stub constructor to allow compilation pass. Need to construct real 
StreamStreamJoinSpec.
-  public StreamStreamJoin(String opId, List<String> inputRelations, String 
output, List<String> joinKeys) {
-    this(null);
-  }
-
-  //TODO: stub constructor to allow compilation pass. Need to construct real 
StreamStreamJoinSpec.
-  public StreamStreamJoin(String opId, List<String> inputRelations, String 
output, List<String> joinKeys,
-      OperatorCallback callback) {
-    super(null, callback);
-    this.spec = null;
-  }
-
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    // TODO Auto-generated method stub
-    // initialize the inputWindows map
-
-  }
-
-  private void join(Tuple tuple, Map<EntityName, Stream> joinSets) {
-    // TODO Auto-generated method stub
-    // Do M-way joins if necessary, it should be ordered based on the orders 
of the input relations in inputs
-    // NOTE: inner joins may be optimized by re-order the input relations by 
joining inputs w/ less join sets first. We will consider it later.
-
-  }
-
-  private Map<EntityName, Stream> findJoinSets(Tuple tuple) {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  private KeyValueIterator<OrderedStoreKey, Tuple> getJoinSet(Tuple tuple, 
EntityName strmName) {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  private List<Entry<String, Object>> getEqualFields(Tuple tuple, EntityName 
strmName) {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
-  protected void realProcess(Relation deltaRelation, SimpleMessageCollector 
collector, TaskCoordinator coordinator)
-      throws Exception {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  protected void realProcess(Tuple tuple, SimpleMessageCollector collector, 
TaskCoordinator coordinator)
-      throws Exception {
-    // TODO Auto-generated method stub
-    Map<EntityName, Stream> joinSets = findJoinSets(tuple);
-    join(tuple, joinSets);
-  }
-
-  @Override
-  public void realRefresh(long timeNano, SimpleMessageCollector collector, 
TaskCoordinator coordinator)
-      throws Exception {
-    // TODO Auto-generated method stub
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
 
b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
deleted file mode 100644
index cc0aca0..0000000
--- 
a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.join;
-
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
-
-
-/**
- * This class defines the specification of a {@link 
org.apache.samza.sql.operators.join.StreamStreamJoin} operator
- */
-public class StreamStreamJoinSpec extends SimpleOperatorSpec {
-
-  public StreamStreamJoinSpec(String id, List<EntityName> inputs, EntityName 
output, List<String> joinKeys) {
-    super(id, inputs, output);
-    // TODO Auto-generated constructor stub
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
 
b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
deleted file mode 100644
index b93d789..0000000
--- 
a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.partition;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.OperatorCallback;
-import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueIterator;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.sql.SimpleMessageCollector;
-
-
-/**
- * This is an example build-in operator that performs a simple stream 
re-partition operation.
- *
- */
-public class PartitionOp extends SimpleOperatorImpl {
-
-  /**
-   * The specification of this {@code PartitionOp}
-   *
-   */
-  private final PartitionSpec spec;
-
-  /**
-   * Ctor that takes the {@link 
org.apache.samza.sql.operators.partition.PartitionSpec} object as input.
-   *
-   * @param spec The <code>PartitionSpec</code> object
-   */
-  public PartitionOp(PartitionSpec spec) {
-    super(spec);
-    this.spec = spec;
-  }
-
-  /**
-   * A simplified constructor that allow users to randomly create 
<code>PartitionOp</code>
-   *
-   * @param id The identifier of this operator
-   * @param input The input stream name of this operator
-   * @param system The output system name of this operator
-   * @param output The output stream name of this operator
-   * @param parKey The partition key used for the output stream
-   * @param parNum The number of partitions used for the output stream
-   */
-  public PartitionOp(String id, String input, String system, String output, 
String parKey, int parNum) {
-    this(new PartitionSpec(id, input, new SystemStream(system, output), 
parKey, parNum));
-  }
-
-  /**
-   * A simplified constructor that allow users to randomly create 
<code>PartitionOp</code>
-   *
-   * @param id The identifier of this operator
-   * @param input The input stream name of this operator
-   * @param system The output system name of this operator
-   * @param output The output stream name of this operator
-   * @param parKey The partition key used for the output stream
-   * @param parNum The number of partitions used for the output stream
-   * @param callback The callback functions for operator
-   */
-  public PartitionOp(String id, String input, String system, String output, 
String parKey, int parNum,
-      OperatorCallback callback) {
-    super(new PartitionSpec(id, input, new SystemStream(system, output), 
parKey, parNum), callback);
-    this.spec = (PartitionSpec) super.getSpec();
-  }
-
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    // TODO Auto-generated method stub
-    // No need to initialize store since all inputs are immediately send out
-  }
-
-  @Override
-  protected void realRefresh(long timeNano, SimpleMessageCollector collector, 
TaskCoordinator coordinator)
-      throws Exception {
-    // TODO Auto-generated method stub
-    // NOOP or flush
-  }
-
-  @Override
-  protected void realProcess(Tuple tuple, SimpleMessageCollector collector, 
TaskCoordinator coordinator)
-      throws Exception {
-    collector.send(new 
OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), 
tuple.getMessage()
-        .getFieldData(PartitionOp.this.spec.getParKey()).value(), 
tuple.getKey().value(), tuple.getMessage().value()));
-  }
-
-  @Override
-  protected void realProcess(Relation deltaRelation, SimpleMessageCollector 
collector, TaskCoordinator coordinator)
-      throws Exception {
-    for(KeyValueIterator<?, Tuple> iter = deltaRelation.all(); iter.hasNext(); 
) {
-      Entry<?, Tuple> entry = iter.next();
-      collector.send(new 
OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), 
entry.getValue().getMessage()
-          .getFieldData(PartitionOp.this.spec.getParKey()).value(), 
entry.getValue().getKey().value(), entry.getValue()
-          .getMessage().value()));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
 
b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
deleted file mode 100644
index c47eed9..0000000
--- 
a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.partition;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.OperatorSpec;
-import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
-import org.apache.samza.system.SystemStream;
-
-
-/**
- * This class defines the specification class of {@link 
org.apache.samza.sql.operators.partition.PartitionOp}
- *
- */
-public class PartitionSpec extends SimpleOperatorSpec implements OperatorSpec {
-
-  /**
-   * The partition key name
-   */
-  private final String parKey;
-
-  /**
-   * The number of partitions
-   */
-  private final int parNum;
-
-  /**
-   * The <code>SystemStream</code> to send the partition output to
-   */
-  private final SystemStream sysStream;
-
-  /**
-   * Ctor to create the {@code PartitionSpec}
-   *
-   * @param id The ID of the {@link 
org.apache.samza.sql.operators.partition.PartitionOp}
-   * @param input The input stream name
-   * @param output The output {@link org.apache.samza.system.SystemStream} 
object
-   * @param parKey The name of the partition key
-   * @param parNum The number of partitions
-   */
-  public PartitionSpec(String id, String input, SystemStream output, String 
parKey, int parNum) {
-    super(id, EntityName.getStreamName(input), 
EntityName.getStreamName(output.getSystem() + ":" + output.getStream()));
-    this.parKey = parKey;
-    this.parNum = parNum;
-    this.sysStream = output;
-  }
-
-  /**
-   * Method to get the partition key name
-   *
-   * @return The partition key name
-   */
-  public String getParKey() {
-    return this.parKey;
-  }
-
-  /**
-   * Method to get the number of partitions
-   *
-   * @return The number of partitions
-   */
-  public int getParNum() {
-    return this.parNum;
-  }
-
-  /**
-   * Method to get the output {@link org.apache.samza.system.SystemStream}
-   *
-   * @return The output system stream object
-   */
-  public SystemStream getSystemStream() {
-    return this.sysStream;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
 
b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
deleted file mode 100644
index d81cc93..0000000
--- 
a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.window;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.OperatorCallback;
-import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
-import org.apache.samza.storage.kv.KeyValueIterator;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.sql.SimpleMessageCollector;
-
-
-/**
- * This class defines an example build-in operator for a fixed size window 
operator that converts a stream to a relation
- *
- */
-public class BoundedTimeWindow extends SimpleOperatorImpl {
-
-  /**
-   * The specification of this window operator
-   */
-  private final WindowSpec spec;
-
-  /**
-   * The relation that the window operator keeps internally
-   */
-  private Relation relation = null;
-
-  /**
-   * The list of window states of all active windows the window operator keeps 
in track
-   */
-  private List<WindowState> windowStates = null;
-
-  /**
-   * Ctor that takes <code>WindowSpec</code> specification as input argument
-   *
-   * <p>This version of constructor is often used in an implementation of 
{@link org.apache.samza.sql.api.operators.SqlOperatorFactory}
-   *
-   * @param spec The window specification object
-   */
-  public BoundedTimeWindow(WindowSpec spec) {
-    super(spec);
-    this.spec = spec;
-  }
-
-  /**
-   * A simplified version of ctor that allows users to randomly created a 
window operator w/o spec object
-   *
-   * @param wndId The identifier of this window operator
-   * @param lengthSec The window size in seconds
-   * @param input The input stream name
-   * @param output The output relation name
-   */
-  public BoundedTimeWindow(String wndId, int lengthSec, String input, String 
output) {
-    this(new WindowSpec(wndId, EntityName.getStreamName(input), 
EntityName.getStreamName(output), lengthSec));
-  }
-
-  /**
-   * A simplified version of ctor that allows users to randomly created a 
window operator w/o spec object
-   *
-   * @param wndId The identifier of this window operator
-   * @param lengthSec The window size in seconds
-   * @param input The input stream name
-   * @param output The output relation name
-   */
-  public BoundedTimeWindow(String wndId, int lengthSec, String input, String 
output, OperatorCallback callback) {
-    super(new WindowSpec(wndId, EntityName.getStreamName(input), 
EntityName.getStreamName(output), lengthSec), callback);
-    this.spec = (WindowSpec) super.getSpec();
-  }
-
-  private void processWindowChanges(SimpleMessageCollector collector) throws 
Exception {
-    if (windowStateChange()) {
-      collector.send(getWindowChanges());
-    }
-  }
-
-  private Relation getWindowChanges() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  private boolean windowStateChange() {
-    // TODO Auto-generated method stub
-    return getWindowChanges() != null;
-  }
-
-  private void updateWindow(Tuple tuple) {
-    // TODO Auto-generated method stub
-    // The window states are updated here
-    // And the correpsonding deltaChanges is also calculated here.
-  }
-
-  private void updateWindowTimeout() {
-    // TODO Auto-generated method stub
-    // The window states are updated here
-    // And the correpsonding deltaChanges is also calculated here.
-  }
-
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    // TODO Auto-generated method stub
-    if (this.relation == null) {
-      this.relation = (Relation) 
context.getStore(this.spec.getOutputName().toString());
-      Relation wndStates = (Relation) 
context.getStore(this.spec.getWndStatesName());
-      this.windowStates = new ArrayList<WindowState>();
-      for (KeyValueIterator<Object, Tuple> iter = wndStates.all(); 
iter.hasNext();) {
-        this.windowStates.add((WindowState) 
iter.next().getValue().getMessage());
-      }
-    }
-  }
-
-  @Override
-  protected void realProcess(Tuple tuple, SimpleMessageCollector collector, 
TaskCoordinator coordinator)
-      throws Exception {
-    // for each tuple, this will evaluate the incoming tuple and update the 
window states.
-    // If the window states allow generating output, calculate the delta 
changes in
-    // the window relation and execute the relation operation 
<code>nextOp</code>
-    updateWindow(tuple);
-    processWindowChanges(collector);
-  }
-
-  @Override
-  protected void realProcess(Relation rel, SimpleMessageCollector collector, 
TaskCoordinator coordinator)
-      throws Exception {
-    for (KeyValueIterator<Object, Tuple> iter = rel.all(); iter.hasNext();) {
-      updateWindow(iter.next().getValue());
-      processWindowChanges(collector);
-    }
-  }
-
-  @Override
-  protected void realRefresh(long timeNano, SimpleMessageCollector collector, 
TaskCoordinator coordinator)
-      throws Exception {
-    updateWindowTimeout();
-    processWindowChanges(collector);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
 
b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
deleted file mode 100644
index eec32ea..0000000
--- 
a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.window;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.OperatorSpec;
-import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
-
-
-/**
- * This class implements the specification class for the build-in {@link 
org.apache.samza.sql.operators.window.BoundedTimeWindow} operator
- */
-public class WindowSpec extends SimpleOperatorSpec implements OperatorSpec {
-
-  /**
-   * The window size in seconds
-   */
-  private final int wndSizeSec;
-
-  /**
-   * Default ctor of the {@code WindowSpec} object
-   *
-   * @param id The identifier of the operator
-   * @param input The input stream entity
-   * @param output The output relation entity
-   * @param lengthSec The window size in seconds
-   */
-  public WindowSpec(String id, EntityName input, EntityName output, int 
lengthSec) {
-    super(id, input, output);
-    this.wndSizeSec = lengthSec;
-  }
-
-  /**
-   * Method to get the window state relation name
-   *
-   * @return The window state relation name
-   */
-  public String getWndStatesName() {
-    return this.getId() + "-wnd-state";
-  }
-
-  /**
-   * Method to get the window size in seconds
-   *
-   * @return The window size in seconds
-   */
-  public int getWndSizeSec() {
-    return this.wndSizeSec;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
 
b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
deleted file mode 100644
index 48547f0..0000000
--- 
a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.window;
-
-public class WindowState {
-  public String startOffset = null;
-  public String endOffset = null;
-  public boolean isClosed = false;
-
-  public void open(String offset) {
-    this.isClosed = false;
-    this.startOffset = offset;
-  }
-
-  public void close(String offset) {
-    this.endOffset = offset;
-    this.isClosed = true;
-  }
-
-  public void advanceTo(String offset) {
-    this.endOffset = offset;
-  }
-
-  public boolean isClosed() {
-    return this.isClosed;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java
 
b/samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java
deleted file mode 100644
index e56d3b3..0000000
--- 
a/samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.window.storage;
-
-/**
- * This defines the base class for all keys used in window operators
- */
-public abstract class OrderedStoreKey implements Comparable<OrderedStoreKey> {
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java 
b/samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java
deleted file mode 100644
index 102819d..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.sql;
-
-/**
- * An implementation of {@link org.apache.samza.system.sql.Offset}, w/ {@code 
long} value as the offset
- */
-public class LongOffset implements Offset {
-
-  /**
-   * The offset value in {@code long}
-   */
-  private final Long offset;
-
-  private LongOffset(long offset) {
-    this.offset = offset;
-  }
-
-  public LongOffset(String offset) {
-    this.offset = Long.valueOf(offset);
-  }
-
-  @Override
-  public int compareTo(Offset o) {
-    if (!(o instanceof LongOffset)) {
-      throw new IllegalArgumentException("Not comparable offset classes. 
LongOffset vs " + o.getClass().getName());
-    }
-    LongOffset other = (LongOffset) o;
-    return this.offset.compareTo(other.offset);
-  }
-
-  /**
-   * Helper method to get the minimum offset
-   *
-   * @return The minimum offset
-   */
-  public static LongOffset getMinOffset() {
-    return new LongOffset(Long.MIN_VALUE);
-  }
-
-  /**
-   * Helper method to get the maximum offset
-   *
-   * @return The maximum offset
-   */
-  public static LongOffset getMaxOffset() {
-    return new LongOffset(Long.MAX_VALUE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java 
b/samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java
deleted file mode 100644
index 98547e2..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.sql;
-
-/**
- * A generic interface extending {@link java.lang.Comparable} to be used as 
{@code Offset} in a stream
- */
-public interface Offset extends Comparable<Offset> {
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java
 
b/samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java
deleted file mode 100644
index 18eb0a3..0000000
--- 
a/samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.OperatorRouter;
-import org.apache.samza.sql.api.operators.SimpleOperator;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * This class extends {@link org.apache.samza.task.sql.SimpleMessageCollector} 
that uses {@link org.apache.samza.sql.api.operators.OperatorRouter}
- *
- */
-public class RouterMessageCollector extends SimpleMessageCollector {
-
-  private final OperatorRouter rteCntx;
-
-  public RouterMessageCollector(MessageCollector collector, TaskCoordinator 
coordinator, OperatorRouter rteCntx) {
-    super(collector, coordinator);
-    this.rteCntx = rteCntx;
-  }
-
-  @Override
-  protected void realSend(Relation rel) throws Exception {
-    for (SimpleOperator op : this.rteCntx.getNextOperators(rel.getName())) {
-      op.process(rel, this, coordinator);
-    }
-  }
-
-  @Override
-  protected void realSend(Tuple tuple) throws Exception {
-    for (SimpleOperator op : 
this.rteCntx.getNextOperators(tuple.getEntityName())) {
-      op.process(tuple, this, coordinator);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
 
b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
deleted file mode 100644
index b29838a..0000000
--- 
a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.OperatorCallback;
-import org.apache.samza.sql.operators.factory.NoopOperatorCallback;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueIterator;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-
-public class SimpleMessageCollector implements MessageCollector {
-  protected final MessageCollector collector;
-  protected final TaskCoordinator coordinator;
-  protected OperatorCallback callback;
-
-  /**
-   * Ctor that creates the {@code SimpleMessageCollector} from scratch
-   * @param collector The {@link org.apache.samza.task.MessageCollector} in 
the context
-   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in 
the context
-   * @param callback The {@link 
org.apache.samza.sql.api.operators.OperatorCallback} in the context
-   */
-  public SimpleMessageCollector(MessageCollector collector, TaskCoordinator 
coordinator, OperatorCallback callback) {
-    this.collector = collector;
-    this.coordinator = coordinator;
-    if (callback == null) {
-      this.callback = new NoopOperatorCallback();
-    } else {
-      this.callback = callback;
-    }
-  }
-
-  /**
-   * Ctor that creates the {@code SimpleMessageCollector} from scratch
-   * @param collector The {@link org.apache.samza.task.MessageCollector} in 
the context
-   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in 
the context
-   */
-  public SimpleMessageCollector(MessageCollector collector, TaskCoordinator 
coordinator) {
-    this.collector = collector;
-    this.coordinator = coordinator;
-  }
-
-  /**
-   * This method swaps the {@code callback} with the new one
-   *
-   * <p> This method allows the {@link 
org.apache.samza.sql.api.operators.SimpleOperator} to be swapped when the 
collector
-   * is passed down into the next operator's context. Hence, under the new 
operator's context, the correct {@link 
org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Relation, 
MessageCollector, TaskCoordinator)},
-   * and {@link 
org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Tuple, 
MessageCollector, TaskCoordinator)} can be invoked
-   *
-   * @param callback The new {@link 
org.apache.samza.sql.api.operators.OperatorCallback} to be set
-   */
-  public void switchOperatorCallback(OperatorCallback callback) {
-    this.callback = callback;
-  }
-
-  /**
-   * Method is declared to be final s.t. we enforce that the callback 
functions are called first
-   */
-  final public void send(Relation deltaRelation) throws Exception {
-    Relation rel = this.callback.afterProcess(deltaRelation, collector, 
coordinator);
-    if (rel == null) {
-      return;
-    }
-    this.realSend(rel);
-  }
-
-  /**
-   * Method is declared to be final s.t. we enforce that the callback 
functions are called first
-   */
-  final public void send(Tuple tuple) throws Exception {
-    Tuple otuple = this.callback.afterProcess(tuple, collector, coordinator);
-    if (otuple == null) {
-      return;
-    }
-    this.realSend(otuple);
-  }
-
-  protected void realSend(Relation rel) throws Exception {
-    for (KeyValueIterator<?, Tuple> iter = rel.all(); iter.hasNext();) {
-      Entry<?, Tuple> entry = iter.next();
-      this.collector.send((OutgoingMessageEnvelope) 
entry.getValue().getMessage());
-    }
-  }
-
-  protected void realSend(Tuple tuple) throws Exception {
-    this.collector.send((OutgoingMessageEnvelope) tuple.getMessage());
-  }
-
-  @Override
-  public void send(OutgoingMessageEnvelope envelope) {
-    this.collector.send(envelope);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java
 
b/samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java
deleted file mode 100644
index 7412669..0000000
--- 
a/samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.data.serializers;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.sql.data.avro.AvroData;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-public class SqlAvroSerdeTest {
-  public static final String ORDER_SCHEMA = "{\"namespace\": 
\"org.apache.samza.sql\",\n"+
-      " \"type\": \"record\",\n"+
-      " \"name\": \"Order\",\n"+
-      " \"fields\": [\n"+
-      "     {\"name\": \"id\", \"type\": \"int\"},\n"+
-      "     {\"name\": \"product\",  \"type\": \"string\"},\n"+
-      "     {\"name\": \"quantity\", \"type\": \"int\"}\n"+
-      " ]\n"+
-      "}";
-
-  public static Schema orderSchema = new Schema.Parser().parse(ORDER_SCHEMA);
-  private static Serde serde = new SqlAvroSerdeFactory().getSerde("sqlAvro", 
sqlAvroSerdeTestConfig());
-
-  @Test
-  public void testSqlAvroSerdeDeserialization() throws IOException {
-    AvroData decodedDatum = 
(AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema));
-
-    Assert.assertTrue(decodedDatum.schema().getType() == 
org.apache.samza.sql.api.data.Schema.Type.STRUCT);
-    Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == 
org.apache.samza.sql.api.data.Schema.Type.INTEGER);
-    Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() 
== org.apache.samza.sql.api.data.Schema.Type.INTEGER);
-    Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() 
== org.apache.samza.sql.api.data.Schema.Type.STRING);
-  }
-
-  @Test
-  public void testSqlAvroSerialization() throws IOException {
-    AvroData decodedDatumOriginal = 
(AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema));
-    byte[] encodedDatum = serde.toBytes(decodedDatumOriginal);
-
-    AvroData decodedDatum = (AvroData)serde.fromBytes(encodedDatum);
-
-    Assert.assertTrue(decodedDatum.schema().getType() == 
org.apache.samza.sql.api.data.Schema.Type.STRUCT);
-    Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == 
org.apache.samza.sql.api.data.Schema.Type.INTEGER);
-    Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() 
== org.apache.samza.sql.api.data.Schema.Type.INTEGER);
-    Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() 
== org.apache.samza.sql.api.data.Schema.Type.STRING);
-  }
-
-  private static Config sqlAvroSerdeTestConfig(){
-    Map<String, String> config = new HashMap<String, String>();
-    config.put("serializers.sqlAvro.schema", ORDER_SCHEMA);
-
-    return new MapConfig(config);
-  }
-
-  private static byte[] encodeMessage(GenericRecord datum, Schema avroSchema) 
throws IOException {
-    DatumWriter<GenericRecord> writer = new 
GenericDatumWriter<GenericRecord>(avroSchema);
-    ByteArrayOutputStream output = new ByteArrayOutputStream();
-    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(output, null);
-    writer.write(datum, encoder);
-    encoder.flush();
-
-    return  output.toByteArray();
-  }
-
-  private static GenericRecord sampleOrderRecord(){
-    GenericData.Record datum = new GenericData.Record(orderSchema);
-    datum.put("id", 1);
-    datum.put("product", "paint");
-    datum.put("quantity", 3);
-
-    return datum;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
 
b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
deleted file mode 100644
index 20dc701..0000000
--- 
a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.OperatorCallback;
-import org.apache.samza.sql.data.IncomingMessageTuple;
-import org.apache.samza.sql.operators.window.BoundedTimeWindow;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.WindowableTask;
-
-
-/***
- * This example illustrate a use case for the full-state timed window operator
- *
- */
-public class RandomWindowOperatorTask implements StreamTask, InitableTask, 
WindowableTask {
-  private BoundedTimeWindow wndOp;
-
-  private final OperatorCallback wndCallback = new OperatorCallback() {
-
-    @Override
-    public Tuple beforeProcess(Tuple tuple, MessageCollector collector, 
TaskCoordinator coordinator) {
-      return tuple;
-    }
-
-    @Override
-    public Relation beforeProcess(Relation rel, MessageCollector collector, 
TaskCoordinator coordinator) {
-      return rel;
-    }
-
-    @Override
-    public Relation afterProcess(Relation rel, MessageCollector collector, 
TaskCoordinator coordinator) {
-      return rel;
-    }
-
-    @Override
-    public Tuple afterProcess(Tuple tuple, MessageCollector collector, 
TaskCoordinator coordinator) {
-      return filterWindowOutput(tuple, collector, coordinator);
-    }
-
-    private Tuple filterWindowOutput(Tuple tuple, MessageCollector collector, 
TaskCoordinator coordinator) {
-      // filter all delete tuples before send
-      if (tuple.isDelete()) {
-        return null;
-      }
-      return tuple;
-    }
-
-  };
-
-  @Override
-  public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator)
-      throws Exception {
-    // based on tuple's stream name, get the window op and run process()
-    wndOp.process(new IncomingMessageTuple(envelope), collector, coordinator);
-
-  }
-
-  @Override
-  public void window(MessageCollector collector, TaskCoordinator coordinator) 
throws Exception {
-    // based on tuple's stream name, get the window op and run process()
-    wndOp.refresh(System.nanoTime(), collector, coordinator);
-  }
-
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    // 1. create a fixed length 10 sec window operator
-    this.wndOp = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1", 
"relation1", this.wndCallback);
-    this.wndOp.init(config, context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 
b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
deleted file mode 100644
index 9124e3c..0000000
--- a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.data.IncomingMessageTuple;
-import org.apache.samza.sql.operators.factory.SimpleRouter;
-import org.apache.samza.sql.operators.join.StreamStreamJoin;
-import org.apache.samza.sql.operators.partition.PartitionOp;
-import org.apache.samza.sql.operators.window.BoundedTimeWindow;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.WindowableTask;
-
-
-/***
- * This example illustrate a SQL join operation that joins two streams 
together using the folowing operations:
- * <ul>
- * <li>a. the two streams are each processed by a window operator to convert 
to relations
- * <li>b. a join operator is applied on the two relations to generate join 
results
- * <li>c. an istream operator is applied on join output and convert the 
relation into a stream
- * <li>d. a partition operator that re-partitions the output stream from 
istream and send the stream to system output
- * </ul>
- *
- * This example also uses an implementation of 
<code>SqlMessageCollector</code> (@see <code>OperatorMessageCollector</code>)
- * that uses <code>OperatorRouter</code> to automatically execute the whole 
paths that connects operators together.
- */
-public class StreamSqlTask implements StreamTask, InitableTask, WindowableTask 
{
-
-  private SimpleRouter rteCntx;
-
-  @Override
-  public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator)
-      throws Exception {
-    this.rteCntx.process(new IncomingMessageTuple(envelope), collector, 
coordinator);
-  }
-
-  @Override
-  public void window(MessageCollector collector, TaskCoordinator coordinator) 
throws Exception {
-    this.rteCntx.refresh(System.nanoTime(), collector, coordinator);
-  }
-
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    // create all operators via the operator factory
-    // 1. create two window operators
-    BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10, 
"inputStream1", "fixedWndOutput1");
-    BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10, 
"inputStream2", "fixedWndOutput2");
-    // 2. create one join operator
-    @SuppressWarnings("serial")
-    List<String> inputRelations = new ArrayList<String>() {
-      {
-        add("fixedWndOutput1");
-        add("fixedWndOutput2");
-      }
-    };
-    @SuppressWarnings("serial")
-    List<String> joinKeys = new ArrayList<String>() {
-      {
-        add("key1");
-        add("key2");
-      }
-    };
-    StreamStreamJoin join = new StreamStreamJoin("joinOp", inputRelations, 
"joinOutput", joinKeys);
-    // 4. create a re-partition operator
-    PartitionOp par = new PartitionOp("parOp1", "joinOutput", "kafka", 
"parOutputStrm1", "joinKey", 50);
-
-    // Now, connecting the operators via the OperatorRouter
-    this.rteCntx = new SimpleRouter();
-    // 1. set two system input operators (i.e. two window operators)
-    this.rteCntx.addOperator(wnd1);
-    this.rteCntx.addOperator(wnd2);
-    // 2. connect join operator to both window operators
-    this.rteCntx.addOperator(join);
-    // 3. connect re-partition operator to the stream operator
-    this.rteCntx.addOperator(par);
-
-    this.rteCntx.init(config, context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
 
b/samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
deleted file mode 100644
index 96e96c3..0000000
--- 
a/samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Stream;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.OperatorCallback;
-import org.apache.samza.sql.data.IncomingMessageTuple;
-import org.apache.samza.sql.operators.factory.SimpleRouter;
-import org.apache.samza.sql.operators.join.StreamStreamJoin;
-import org.apache.samza.sql.operators.partition.PartitionOp;
-import org.apache.samza.sql.operators.window.BoundedTimeWindow;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.WindowableTask;
-
-
-/***
- * This example illustrate a SQL join operation that joins two streams 
together using the folowing operations:
- * <ul>
- * <li>a. the two streams are each processed by a window operator to convert 
to relations
- * <li>b. a join operator is applied on the two relations to generate join 
results
- * <li>c. an istream operator is applied on join output and convert the 
relation into a stream
- * <li>d. a partition operator that re-partitions the output stream from 
istream and send the stream to system output
- * </ul>
- *
- * This example also uses an implementation of 
<code>SqlMessageCollector</code> (@see <code>OperatorMessageCollector</code>)
- * that uses <code>OperatorRouter</code> to automatically execute the whole 
paths that connects operators together.
- */
-public class UserCallbacksSqlTask implements StreamTask, InitableTask, 
WindowableTask {
-
-  private SimpleRouter simpleRtr;
-
-  private final OperatorCallback wndCallback = new OperatorCallback() {
-
-    @Override
-    public Tuple beforeProcess(Tuple tuple, MessageCollector collector, 
TaskCoordinator coordinator) {
-      return filterWindowInput(tuple, collector, coordinator);
-    }
-
-    @Override
-    public Relation beforeProcess(Relation rel, MessageCollector collector, 
TaskCoordinator coordinator) {
-      return onRelationInput(rel, collector, coordinator);
-    }
-
-    @Override
-    public Relation afterProcess(Relation rel, MessageCollector collector, 
TaskCoordinator coordinator) {
-      return rel;
-    }
-
-    @Override
-    public Tuple afterProcess(Tuple tuple, MessageCollector collector, 
TaskCoordinator coordinator) {
-      return tuple;
-    }
-
-    private Tuple filterWindowInput(Tuple tuple, MessageCollector collector, 
TaskCoordinator coordinator) {
-      // filter all delete tuples before send
-      if (tuple.isDelete()) {
-        return null;
-      }
-      return tuple;
-    }
-
-    private Relation onRelationInput(Relation rel, MessageCollector collector,
-        TaskCoordinator coordinator) {
-      // check whether the input is a stream
-      if (!(rel instanceof Stream<?>)) {
-        throw new IllegalArgumentException("Wrong input entity");
-      }
-      return rel;
-    }
-  };
-
-  @Override
-  public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator)
-      throws Exception {
-    this.simpleRtr.process(new IncomingMessageTuple(envelope), collector, 
coordinator);
-  }
-
-  @Override
-  public void window(MessageCollector collector, TaskCoordinator coordinator) 
throws Exception {
-    this.simpleRtr.refresh(System.nanoTime(), collector, coordinator);
-  }
-
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    // create all operators via the operator factory
-    // 1. create two window operators
-    BoundedTimeWindow wnd1 =
-        new BoundedTimeWindow("fixedWnd1", 10, "inputStream1", 
"fixedWndOutput1", this.wndCallback);
-    BoundedTimeWindow wnd2 =
-        new BoundedTimeWindow("fixedWnd2", 10, "inputStream2", 
"fixedWndOutput2", this.wndCallback);
-    // 2. create one join operator
-    @SuppressWarnings("serial")
-    List<String> inputRelations = new ArrayList<String>() {
-      {
-        add("fixedWndOutput1");
-        add("fixedWndOutput2");
-      }
-    };
-    @SuppressWarnings("serial")
-    List<String> joinKeys = new ArrayList<String>() {
-      {
-        add("key1");
-        add("key2");
-      }
-    };
-    StreamStreamJoin join = new StreamStreamJoin("joinOp", inputRelations, 
"joinOutput", joinKeys);
-    // 4. create a re-partition operator
-    PartitionOp par = new PartitionOp("parOp1", "joinOutput", "kafka", 
"parOutputStrm1", "joinKey", 50);
-
-    // Now, connecting the operators via the OperatorRouter
-    this.simpleRtr = new SimpleRouter();
-    // 1. set two system input operators (i.e. two window operators)
-    this.simpleRtr.addOperator(wnd1);
-    this.simpleRtr.addOperator(wnd2);
-    // 2. connect join operator to both window operators
-    this.simpleRtr.addOperator(join);
-    // 3. connect re-partition operator to the stream operator
-    this.simpleRtr.addOperator(par);
-
-    this.simpleRtr.init(config, context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index f1903c6..fde8ab5 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -23,7 +23,7 @@ include \
   'samza-shell'
 
 def jdk8Modules = [
-  'samza-sql-core',
+  'samza-operator',
   'samza-sql-calcite'
 ] as HashSet
 

Reply via email to