Repository: samza Updated Branches: refs/heads/samza-sql f29c61498 -> fbdd76daa
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java deleted file mode 100644 index 2854aeb..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.operators.join; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.samza.config.Config; -import org.apache.samza.sql.api.data.EntityName; -import org.apache.samza.sql.api.data.Relation; -import org.apache.samza.sql.api.data.Stream; -import org.apache.samza.sql.api.data.Tuple; -import org.apache.samza.sql.api.operators.OperatorCallback; -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl; -import org.apache.samza.sql.operators.window.BoundedTimeWindow; -import org.apache.samza.sql.window.storage.OrderedStoreKey; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.storage.kv.KeyValueIterator; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; -import org.apache.samza.task.sql.SimpleMessageCollector; - - -/** - * This class implements a simple stream-to-stream join - */ -public class StreamStreamJoin extends SimpleOperatorImpl { - private final StreamStreamJoinSpec spec; - - private Map<EntityName, BoundedTimeWindow> inputWindows = new HashMap<EntityName, BoundedTimeWindow>(); - - public StreamStreamJoin(StreamStreamJoinSpec spec) { - super(spec); - this.spec = spec; - } - - //TODO: stub constructor to allow compilation pass. Need to construct real StreamStreamJoinSpec. - public StreamStreamJoin(String opId, List<String> inputRelations, String output, List<String> joinKeys) { - this(null); - } - - //TODO: stub constructor to allow compilation pass. Need to construct real StreamStreamJoinSpec. - public StreamStreamJoin(String opId, List<String> inputRelations, String output, List<String> joinKeys, - OperatorCallback callback) { - super(null, callback); - this.spec = null; - } - - @Override - public void init(Config config, TaskContext context) throws Exception { - // TODO Auto-generated method stub - // initialize the inputWindows map - - } - - private void join(Tuple tuple, Map<EntityName, Stream> joinSets) { - // TODO Auto-generated method stub - // Do M-way joins if necessary, it should be ordered based on the orders of the input relations in inputs - // NOTE: inner joins may be optimized by re-order the input relations by joining inputs w/ less join sets first. We will consider it later. - - } - - private Map<EntityName, Stream> findJoinSets(Tuple tuple) { - // TODO Auto-generated method stub - return null; - } - - private KeyValueIterator<OrderedStoreKey, Tuple> getJoinSet(Tuple tuple, EntityName strmName) { - // TODO Auto-generated method stub - return null; - } - - private List<Entry<String, Object>> getEqualFields(Tuple tuple, EntityName strmName) { - // TODO Auto-generated method stub - return null; - } - - @Override - protected void realProcess(Relation deltaRelation, SimpleMessageCollector collector, TaskCoordinator coordinator) - throws Exception { - // TODO Auto-generated method stub - - } - - @Override - protected void realProcess(Tuple tuple, SimpleMessageCollector collector, TaskCoordinator coordinator) - throws Exception { - // TODO Auto-generated method stub - Map<EntityName, Stream> joinSets = findJoinSets(tuple); - join(tuple, joinSets); - } - - @Override - public void realRefresh(long timeNano, SimpleMessageCollector collector, TaskCoordinator coordinator) - throws Exception { - // TODO Auto-generated method stub - - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java deleted file mode 100644 index cc0aca0..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.operators.join; - -import java.util.List; - -import org.apache.samza.sql.api.data.EntityName; -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; - - -/** - * This class defines the specification of a {@link org.apache.samza.sql.operators.join.StreamStreamJoin} operator - */ -public class StreamStreamJoinSpec extends SimpleOperatorSpec { - - public StreamStreamJoinSpec(String id, List<EntityName> inputs, EntityName output, List<String> joinKeys) { - super(id, inputs, output); - // TODO Auto-generated constructor stub - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java deleted file mode 100644 index b93d789..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.operators.partition; - -import org.apache.samza.config.Config; -import org.apache.samza.sql.api.data.Relation; -import org.apache.samza.sql.api.data.Tuple; -import org.apache.samza.sql.api.operators.OperatorCallback; -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.storage.kv.KeyValueIterator; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.SystemStream; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; -import org.apache.samza.task.sql.SimpleMessageCollector; - - -/** - * This is an example build-in operator that performs a simple stream re-partition operation. - * - */ -public class PartitionOp extends SimpleOperatorImpl { - - /** - * The specification of this {@code PartitionOp} - * - */ - private final PartitionSpec spec; - - /** - * Ctor that takes the {@link org.apache.samza.sql.operators.partition.PartitionSpec} object as input. - * - * @param spec The <code>PartitionSpec</code> object - */ - public PartitionOp(PartitionSpec spec) { - super(spec); - this.spec = spec; - } - - /** - * A simplified constructor that allow users to randomly create <code>PartitionOp</code> - * - * @param id The identifier of this operator - * @param input The input stream name of this operator - * @param system The output system name of this operator - * @param output The output stream name of this operator - * @param parKey The partition key used for the output stream - * @param parNum The number of partitions used for the output stream - */ - public PartitionOp(String id, String input, String system, String output, String parKey, int parNum) { - this(new PartitionSpec(id, input, new SystemStream(system, output), parKey, parNum)); - } - - /** - * A simplified constructor that allow users to randomly create <code>PartitionOp</code> - * - * @param id The identifier of this operator - * @param input The input stream name of this operator - * @param system The output system name of this operator - * @param output The output stream name of this operator - * @param parKey The partition key used for the output stream - * @param parNum The number of partitions used for the output stream - * @param callback The callback functions for operator - */ - public PartitionOp(String id, String input, String system, String output, String parKey, int parNum, - OperatorCallback callback) { - super(new PartitionSpec(id, input, new SystemStream(system, output), parKey, parNum), callback); - this.spec = (PartitionSpec) super.getSpec(); - } - - @Override - public void init(Config config, TaskContext context) throws Exception { - // TODO Auto-generated method stub - // No need to initialize store since all inputs are immediately send out - } - - @Override - protected void realRefresh(long timeNano, SimpleMessageCollector collector, TaskCoordinator coordinator) - throws Exception { - // TODO Auto-generated method stub - // NOOP or flush - } - - @Override - protected void realProcess(Tuple tuple, SimpleMessageCollector collector, TaskCoordinator coordinator) - throws Exception { - collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getMessage() - .getFieldData(PartitionOp.this.spec.getParKey()).value(), tuple.getKey().value(), tuple.getMessage().value())); - } - - @Override - protected void realProcess(Relation deltaRelation, SimpleMessageCollector collector, TaskCoordinator coordinator) - throws Exception { - for(KeyValueIterator<?, Tuple> iter = deltaRelation.all(); iter.hasNext(); ) { - Entry<?, Tuple> entry = iter.next(); - collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), entry.getValue().getMessage() - .getFieldData(PartitionOp.this.spec.getParKey()).value(), entry.getValue().getKey().value(), entry.getValue() - .getMessage().value())); - } - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java deleted file mode 100644 index c47eed9..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.operators.partition; - -import org.apache.samza.sql.api.data.EntityName; -import org.apache.samza.sql.api.operators.OperatorSpec; -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; -import org.apache.samza.system.SystemStream; - - -/** - * This class defines the specification class of {@link org.apache.samza.sql.operators.partition.PartitionOp} - * - */ -public class PartitionSpec extends SimpleOperatorSpec implements OperatorSpec { - - /** - * The partition key name - */ - private final String parKey; - - /** - * The number of partitions - */ - private final int parNum; - - /** - * The <code>SystemStream</code> to send the partition output to - */ - private final SystemStream sysStream; - - /** - * Ctor to create the {@code PartitionSpec} - * - * @param id The ID of the {@link org.apache.samza.sql.operators.partition.PartitionOp} - * @param input The input stream name - * @param output The output {@link org.apache.samza.system.SystemStream} object - * @param parKey The name of the partition key - * @param parNum The number of partitions - */ - public PartitionSpec(String id, String input, SystemStream output, String parKey, int parNum) { - super(id, EntityName.getStreamName(input), EntityName.getStreamName(output.getSystem() + ":" + output.getStream())); - this.parKey = parKey; - this.parNum = parNum; - this.sysStream = output; - } - - /** - * Method to get the partition key name - * - * @return The partition key name - */ - public String getParKey() { - return this.parKey; - } - - /** - * Method to get the number of partitions - * - * @return The number of partitions - */ - public int getParNum() { - return this.parNum; - } - - /** - * Method to get the output {@link org.apache.samza.system.SystemStream} - * - * @return The output system stream object - */ - public SystemStream getSystemStream() { - return this.sysStream; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java deleted file mode 100644 index d81cc93..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.operators.window; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.samza.config.Config; -import org.apache.samza.sql.api.data.EntityName; -import org.apache.samza.sql.api.data.Relation; -import org.apache.samza.sql.api.data.Tuple; -import org.apache.samza.sql.api.operators.OperatorCallback; -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl; -import org.apache.samza.storage.kv.KeyValueIterator; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; -import org.apache.samza.task.sql.SimpleMessageCollector; - - -/** - * This class defines an example build-in operator for a fixed size window operator that converts a stream to a relation - * - */ -public class BoundedTimeWindow extends SimpleOperatorImpl { - - /** - * The specification of this window operator - */ - private final WindowSpec spec; - - /** - * The relation that the window operator keeps internally - */ - private Relation relation = null; - - /** - * The list of window states of all active windows the window operator keeps in track - */ - private List<WindowState> windowStates = null; - - /** - * Ctor that takes <code>WindowSpec</code> specification as input argument - * - * <p>This version of constructor is often used in an implementation of {@link org.apache.samza.sql.api.operators.SqlOperatorFactory} - * - * @param spec The window specification object - */ - public BoundedTimeWindow(WindowSpec spec) { - super(spec); - this.spec = spec; - } - - /** - * A simplified version of ctor that allows users to randomly created a window operator w/o spec object - * - * @param wndId The identifier of this window operator - * @param lengthSec The window size in seconds - * @param input The input stream name - * @param output The output relation name - */ - public BoundedTimeWindow(String wndId, int lengthSec, String input, String output) { - this(new WindowSpec(wndId, EntityName.getStreamName(input), EntityName.getStreamName(output), lengthSec)); - } - - /** - * A simplified version of ctor that allows users to randomly created a window operator w/o spec object - * - * @param wndId The identifier of this window operator - * @param lengthSec The window size in seconds - * @param input The input stream name - * @param output The output relation name - */ - public BoundedTimeWindow(String wndId, int lengthSec, String input, String output, OperatorCallback callback) { - super(new WindowSpec(wndId, EntityName.getStreamName(input), EntityName.getStreamName(output), lengthSec), callback); - this.spec = (WindowSpec) super.getSpec(); - } - - private void processWindowChanges(SimpleMessageCollector collector) throws Exception { - if (windowStateChange()) { - collector.send(getWindowChanges()); - } - } - - private Relation getWindowChanges() { - // TODO Auto-generated method stub - return null; - } - - private boolean windowStateChange() { - // TODO Auto-generated method stub - return getWindowChanges() != null; - } - - private void updateWindow(Tuple tuple) { - // TODO Auto-generated method stub - // The window states are updated here - // And the correpsonding deltaChanges is also calculated here. - } - - private void updateWindowTimeout() { - // TODO Auto-generated method stub - // The window states are updated here - // And the correpsonding deltaChanges is also calculated here. - } - - @Override - public void init(Config config, TaskContext context) throws Exception { - // TODO Auto-generated method stub - if (this.relation == null) { - this.relation = (Relation) context.getStore(this.spec.getOutputName().toString()); - Relation wndStates = (Relation) context.getStore(this.spec.getWndStatesName()); - this.windowStates = new ArrayList<WindowState>(); - for (KeyValueIterator<Object, Tuple> iter = wndStates.all(); iter.hasNext();) { - this.windowStates.add((WindowState) iter.next().getValue().getMessage()); - } - } - } - - @Override - protected void realProcess(Tuple tuple, SimpleMessageCollector collector, TaskCoordinator coordinator) - throws Exception { - // for each tuple, this will evaluate the incoming tuple and update the window states. - // If the window states allow generating output, calculate the delta changes in - // the window relation and execute the relation operation <code>nextOp</code> - updateWindow(tuple); - processWindowChanges(collector); - } - - @Override - protected void realProcess(Relation rel, SimpleMessageCollector collector, TaskCoordinator coordinator) - throws Exception { - for (KeyValueIterator<Object, Tuple> iter = rel.all(); iter.hasNext();) { - updateWindow(iter.next().getValue()); - processWindowChanges(collector); - } - } - - @Override - protected void realRefresh(long timeNano, SimpleMessageCollector collector, TaskCoordinator coordinator) - throws Exception { - updateWindowTimeout(); - processWindowChanges(collector); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java deleted file mode 100644 index eec32ea..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.operators.window; - -import org.apache.samza.sql.api.data.EntityName; -import org.apache.samza.sql.api.operators.OperatorSpec; -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; - - -/** - * This class implements the specification class for the build-in {@link org.apache.samza.sql.operators.window.BoundedTimeWindow} operator - */ -public class WindowSpec extends SimpleOperatorSpec implements OperatorSpec { - - /** - * The window size in seconds - */ - private final int wndSizeSec; - - /** - * Default ctor of the {@code WindowSpec} object - * - * @param id The identifier of the operator - * @param input The input stream entity - * @param output The output relation entity - * @param lengthSec The window size in seconds - */ - public WindowSpec(String id, EntityName input, EntityName output, int lengthSec) { - super(id, input, output); - this.wndSizeSec = lengthSec; - } - - /** - * Method to get the window state relation name - * - * @return The window state relation name - */ - public String getWndStatesName() { - return this.getId() + "-wnd-state"; - } - - /** - * Method to get the window size in seconds - * - * @return The window size in seconds - */ - public int getWndSizeSec() { - return this.wndSizeSec; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java deleted file mode 100644 index 48547f0..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.operators.window; - -public class WindowState { - public String startOffset = null; - public String endOffset = null; - public boolean isClosed = false; - - public void open(String offset) { - this.isClosed = false; - this.startOffset = offset; - } - - public void close(String offset) { - this.endOffset = offset; - this.isClosed = true; - } - - public void advanceTo(String offset) { - this.endOffset = offset; - } - - public boolean isClosed() { - return this.isClosed; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java b/samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java deleted file mode 100644 index e56d3b3..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.window.storage; - -/** - * This defines the base class for all keys used in window operators - */ -public abstract class OrderedStoreKey implements Comparable<OrderedStoreKey> { -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java b/samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java deleted file mode 100644 index 102819d..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.system.sql; - -/** - * An implementation of {@link org.apache.samza.system.sql.Offset}, w/ {@code long} value as the offset - */ -public class LongOffset implements Offset { - - /** - * The offset value in {@code long} - */ - private final Long offset; - - private LongOffset(long offset) { - this.offset = offset; - } - - public LongOffset(String offset) { - this.offset = Long.valueOf(offset); - } - - @Override - public int compareTo(Offset o) { - if (!(o instanceof LongOffset)) { - throw new IllegalArgumentException("Not comparable offset classes. LongOffset vs " + o.getClass().getName()); - } - LongOffset other = (LongOffset) o; - return this.offset.compareTo(other.offset); - } - - /** - * Helper method to get the minimum offset - * - * @return The minimum offset - */ - public static LongOffset getMinOffset() { - return new LongOffset(Long.MIN_VALUE); - } - - /** - * Helper method to get the maximum offset - * - * @return The maximum offset - */ - public static LongOffset getMaxOffset() { - return new LongOffset(Long.MAX_VALUE); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java b/samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java deleted file mode 100644 index 98547e2..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.system.sql; - -/** - * A generic interface extending {@link java.lang.Comparable} to be used as {@code Offset} in a stream - */ -public interface Offset extends Comparable<Offset> { - -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java b/samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java deleted file mode 100644 index 18eb0a3..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.task.sql; - -import org.apache.samza.sql.api.data.Relation; -import org.apache.samza.sql.api.data.Tuple; -import org.apache.samza.sql.api.operators.OperatorRouter; -import org.apache.samza.sql.api.operators.SimpleOperator; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; - - -/** - * This class extends {@link org.apache.samza.task.sql.SimpleMessageCollector} that uses {@link org.apache.samza.sql.api.operators.OperatorRouter} - * - */ -public class RouterMessageCollector extends SimpleMessageCollector { - - private final OperatorRouter rteCntx; - - public RouterMessageCollector(MessageCollector collector, TaskCoordinator coordinator, OperatorRouter rteCntx) { - super(collector, coordinator); - this.rteCntx = rteCntx; - } - - @Override - protected void realSend(Relation rel) throws Exception { - for (SimpleOperator op : this.rteCntx.getNextOperators(rel.getName())) { - op.process(rel, this, coordinator); - } - } - - @Override - protected void realSend(Tuple tuple) throws Exception { - for (SimpleOperator op : this.rteCntx.getNextOperators(tuple.getEntityName())) { - op.process(tuple, this, coordinator); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java deleted file mode 100644 index b29838a..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.task.sql; - -import org.apache.samza.sql.api.data.Relation; -import org.apache.samza.sql.api.data.Tuple; -import org.apache.samza.sql.api.operators.OperatorCallback; -import org.apache.samza.sql.operators.factory.NoopOperatorCallback; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.storage.kv.KeyValueIterator; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; - - -public class SimpleMessageCollector implements MessageCollector { - protected final MessageCollector collector; - protected final TaskCoordinator coordinator; - protected OperatorCallback callback; - - /** - * Ctor that creates the {@code SimpleMessageCollector} from scratch - * @param collector The {@link org.apache.samza.task.MessageCollector} in the context - * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context - * @param callback The {@link org.apache.samza.sql.api.operators.OperatorCallback} in the context - */ - public SimpleMessageCollector(MessageCollector collector, TaskCoordinator coordinator, OperatorCallback callback) { - this.collector = collector; - this.coordinator = coordinator; - if (callback == null) { - this.callback = new NoopOperatorCallback(); - } else { - this.callback = callback; - } - } - - /** - * Ctor that creates the {@code SimpleMessageCollector} from scratch - * @param collector The {@link org.apache.samza.task.MessageCollector} in the context - * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context - */ - public SimpleMessageCollector(MessageCollector collector, TaskCoordinator coordinator) { - this.collector = collector; - this.coordinator = coordinator; - } - - /** - * This method swaps the {@code callback} with the new one - * - * <p> This method allows the {@link org.apache.samza.sql.api.operators.SimpleOperator} to be swapped when the collector - * is passed down into the next operator's context. Hence, under the new operator's context, the correct {@link org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Relation, MessageCollector, TaskCoordinator)}, - * and {@link org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Tuple, MessageCollector, TaskCoordinator)} can be invoked - * - * @param callback The new {@link org.apache.samza.sql.api.operators.OperatorCallback} to be set - */ - public void switchOperatorCallback(OperatorCallback callback) { - this.callback = callback; - } - - /** - * Method is declared to be final s.t. we enforce that the callback functions are called first - */ - final public void send(Relation deltaRelation) throws Exception { - Relation rel = this.callback.afterProcess(deltaRelation, collector, coordinator); - if (rel == null) { - return; - } - this.realSend(rel); - } - - /** - * Method is declared to be final s.t. we enforce that the callback functions are called first - */ - final public void send(Tuple tuple) throws Exception { - Tuple otuple = this.callback.afterProcess(tuple, collector, coordinator); - if (otuple == null) { - return; - } - this.realSend(otuple); - } - - protected void realSend(Relation rel) throws Exception { - for (KeyValueIterator<?, Tuple> iter = rel.all(); iter.hasNext();) { - Entry<?, Tuple> entry = iter.next(); - this.collector.send((OutgoingMessageEnvelope) entry.getValue().getMessage()); - } - } - - protected void realSend(Tuple tuple) throws Exception { - this.collector.send((OutgoingMessageEnvelope) tuple.getMessage()); - } - - @Override - public void send(OutgoingMessageEnvelope envelope) { - this.collector.send(envelope); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java b/samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java deleted file mode 100644 index 7412669..0000000 --- a/samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.sql.data.serializers; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.EncoderFactory; -import org.apache.samza.config.Config; -import org.apache.samza.config.MapConfig; -import org.apache.samza.serializers.Serde; -import org.apache.samza.sql.data.avro.AvroData; -import org.junit.Assert; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -public class SqlAvroSerdeTest { - public static final String ORDER_SCHEMA = "{\"namespace\": \"org.apache.samza.sql\",\n"+ - " \"type\": \"record\",\n"+ - " \"name\": \"Order\",\n"+ - " \"fields\": [\n"+ - " {\"name\": \"id\", \"type\": \"int\"},\n"+ - " {\"name\": \"product\", \"type\": \"string\"},\n"+ - " {\"name\": \"quantity\", \"type\": \"int\"}\n"+ - " ]\n"+ - "}"; - - public static Schema orderSchema = new Schema.Parser().parse(ORDER_SCHEMA); - private static Serde serde = new SqlAvroSerdeFactory().getSerde("sqlAvro", sqlAvroSerdeTestConfig()); - - @Test - public void testSqlAvroSerdeDeserialization() throws IOException { - AvroData decodedDatum = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema)); - - Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.sql.api.data.Schema.Type.STRUCT); - Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.sql.api.data.Schema.Type.INTEGER); - Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.sql.api.data.Schema.Type.INTEGER); - Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.sql.api.data.Schema.Type.STRING); - } - - @Test - public void testSqlAvroSerialization() throws IOException { - AvroData decodedDatumOriginal = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema)); - byte[] encodedDatum = serde.toBytes(decodedDatumOriginal); - - AvroData decodedDatum = (AvroData)serde.fromBytes(encodedDatum); - - Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.sql.api.data.Schema.Type.STRUCT); - Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.sql.api.data.Schema.Type.INTEGER); - Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.sql.api.data.Schema.Type.INTEGER); - Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.sql.api.data.Schema.Type.STRING); - } - - private static Config sqlAvroSerdeTestConfig(){ - Map<String, String> config = new HashMap<String, String>(); - config.put("serializers.sqlAvro.schema", ORDER_SCHEMA); - - return new MapConfig(config); - } - - private static byte[] encodeMessage(GenericRecord datum, Schema avroSchema) throws IOException { - DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(avroSchema); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(output, null); - writer.write(datum, encoder); - encoder.flush(); - - return output.toByteArray(); - } - - private static GenericRecord sampleOrderRecord(){ - GenericData.Record datum = new GenericData.Record(orderSchema); - datum.put("id", 1); - datum.put("product", "paint"); - datum.put("quantity", 3); - - return datum; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java deleted file mode 100644 index 20dc701..0000000 --- a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.task.sql; - -import org.apache.samza.config.Config; -import org.apache.samza.sql.api.data.Relation; -import org.apache.samza.sql.api.data.Tuple; -import org.apache.samza.sql.api.operators.OperatorCallback; -import org.apache.samza.sql.data.IncomingMessageTuple; -import org.apache.samza.sql.operators.window.BoundedTimeWindow; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.task.InitableTask; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; -import org.apache.samza.task.WindowableTask; - - -/*** - * This example illustrate a use case for the full-state timed window operator - * - */ -public class RandomWindowOperatorTask implements StreamTask, InitableTask, WindowableTask { - private BoundedTimeWindow wndOp; - - private final OperatorCallback wndCallback = new OperatorCallback() { - - @Override - public Tuple beforeProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) { - return tuple; - } - - @Override - public Relation beforeProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) { - return rel; - } - - @Override - public Relation afterProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) { - return rel; - } - - @Override - public Tuple afterProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) { - return filterWindowOutput(tuple, collector, coordinator); - } - - private Tuple filterWindowOutput(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) { - // filter all delete tuples before send - if (tuple.isDelete()) { - return null; - } - return tuple; - } - - }; - - @Override - public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) - throws Exception { - // based on tuple's stream name, get the window op and run process() - wndOp.process(new IncomingMessageTuple(envelope), collector, coordinator); - - } - - @Override - public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { - // based on tuple's stream name, get the window op and run process() - wndOp.refresh(System.nanoTime(), collector, coordinator); - } - - @Override - public void init(Config config, TaskContext context) throws Exception { - // 1. create a fixed length 10 sec window operator - this.wndOp = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1", "relation1", this.wndCallback); - this.wndOp.init(config, context); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java deleted file mode 100644 index 9124e3c..0000000 --- a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.task.sql; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.samza.config.Config; -import org.apache.samza.sql.data.IncomingMessageTuple; -import org.apache.samza.sql.operators.factory.SimpleRouter; -import org.apache.samza.sql.operators.join.StreamStreamJoin; -import org.apache.samza.sql.operators.partition.PartitionOp; -import org.apache.samza.sql.operators.window.BoundedTimeWindow; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.task.InitableTask; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; -import org.apache.samza.task.WindowableTask; - - -/*** - * This example illustrate a SQL join operation that joins two streams together using the folowing operations: - * <ul> - * <li>a. the two streams are each processed by a window operator to convert to relations - * <li>b. a join operator is applied on the two relations to generate join results - * <li>c. an istream operator is applied on join output and convert the relation into a stream - * <li>d. a partition operator that re-partitions the output stream from istream and send the stream to system output - * </ul> - * - * This example also uses an implementation of <code>SqlMessageCollector</code> (@see <code>OperatorMessageCollector</code>) - * that uses <code>OperatorRouter</code> to automatically execute the whole paths that connects operators together. - */ -public class StreamSqlTask implements StreamTask, InitableTask, WindowableTask { - - private SimpleRouter rteCntx; - - @Override - public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) - throws Exception { - this.rteCntx.process(new IncomingMessageTuple(envelope), collector, coordinator); - } - - @Override - public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { - this.rteCntx.refresh(System.nanoTime(), collector, coordinator); - } - - @Override - public void init(Config config, TaskContext context) throws Exception { - // create all operators via the operator factory - // 1. create two window operators - BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10, "inputStream1", "fixedWndOutput1"); - BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10, "inputStream2", "fixedWndOutput2"); - // 2. create one join operator - @SuppressWarnings("serial") - List<String> inputRelations = new ArrayList<String>() { - { - add("fixedWndOutput1"); - add("fixedWndOutput2"); - } - }; - @SuppressWarnings("serial") - List<String> joinKeys = new ArrayList<String>() { - { - add("key1"); - add("key2"); - } - }; - StreamStreamJoin join = new StreamStreamJoin("joinOp", inputRelations, "joinOutput", joinKeys); - // 4. create a re-partition operator - PartitionOp par = new PartitionOp("parOp1", "joinOutput", "kafka", "parOutputStrm1", "joinKey", 50); - - // Now, connecting the operators via the OperatorRouter - this.rteCntx = new SimpleRouter(); - // 1. set two system input operators (i.e. two window operators) - this.rteCntx.addOperator(wnd1); - this.rteCntx.addOperator(wnd2); - // 2. connect join operator to both window operators - this.rteCntx.addOperator(join); - // 3. connect re-partition operator to the stream operator - this.rteCntx.addOperator(par); - - this.rteCntx.init(config, context); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java b/samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java deleted file mode 100644 index 96e96c3..0000000 --- a/samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.task.sql; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.samza.config.Config; -import org.apache.samza.sql.api.data.Relation; -import org.apache.samza.sql.api.data.Stream; -import org.apache.samza.sql.api.data.Tuple; -import org.apache.samza.sql.api.operators.OperatorCallback; -import org.apache.samza.sql.data.IncomingMessageTuple; -import org.apache.samza.sql.operators.factory.SimpleRouter; -import org.apache.samza.sql.operators.join.StreamStreamJoin; -import org.apache.samza.sql.operators.partition.PartitionOp; -import org.apache.samza.sql.operators.window.BoundedTimeWindow; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.task.InitableTask; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; -import org.apache.samza.task.WindowableTask; - - -/*** - * This example illustrate a SQL join operation that joins two streams together using the folowing operations: - * <ul> - * <li>a. the two streams are each processed by a window operator to convert to relations - * <li>b. a join operator is applied on the two relations to generate join results - * <li>c. an istream operator is applied on join output and convert the relation into a stream - * <li>d. a partition operator that re-partitions the output stream from istream and send the stream to system output - * </ul> - * - * This example also uses an implementation of <code>SqlMessageCollector</code> (@see <code>OperatorMessageCollector</code>) - * that uses <code>OperatorRouter</code> to automatically execute the whole paths that connects operators together. - */ -public class UserCallbacksSqlTask implements StreamTask, InitableTask, WindowableTask { - - private SimpleRouter simpleRtr; - - private final OperatorCallback wndCallback = new OperatorCallback() { - - @Override - public Tuple beforeProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) { - return filterWindowInput(tuple, collector, coordinator); - } - - @Override - public Relation beforeProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) { - return onRelationInput(rel, collector, coordinator); - } - - @Override - public Relation afterProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) { - return rel; - } - - @Override - public Tuple afterProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) { - return tuple; - } - - private Tuple filterWindowInput(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) { - // filter all delete tuples before send - if (tuple.isDelete()) { - return null; - } - return tuple; - } - - private Relation onRelationInput(Relation rel, MessageCollector collector, - TaskCoordinator coordinator) { - // check whether the input is a stream - if (!(rel instanceof Stream<?>)) { - throw new IllegalArgumentException("Wrong input entity"); - } - return rel; - } - }; - - @Override - public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) - throws Exception { - this.simpleRtr.process(new IncomingMessageTuple(envelope), collector, coordinator); - } - - @Override - public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { - this.simpleRtr.refresh(System.nanoTime(), collector, coordinator); - } - - @Override - public void init(Config config, TaskContext context) throws Exception { - // create all operators via the operator factory - // 1. create two window operators - BoundedTimeWindow wnd1 = - new BoundedTimeWindow("fixedWnd1", 10, "inputStream1", "fixedWndOutput1", this.wndCallback); - BoundedTimeWindow wnd2 = - new BoundedTimeWindow("fixedWnd2", 10, "inputStream2", "fixedWndOutput2", this.wndCallback); - // 2. create one join operator - @SuppressWarnings("serial") - List<String> inputRelations = new ArrayList<String>() { - { - add("fixedWndOutput1"); - add("fixedWndOutput2"); - } - }; - @SuppressWarnings("serial") - List<String> joinKeys = new ArrayList<String>() { - { - add("key1"); - add("key2"); - } - }; - StreamStreamJoin join = new StreamStreamJoin("joinOp", inputRelations, "joinOutput", joinKeys); - // 4. create a re-partition operator - PartitionOp par = new PartitionOp("parOp1", "joinOutput", "kafka", "parOutputStrm1", "joinKey", 50); - - // Now, connecting the operators via the OperatorRouter - this.simpleRtr = new SimpleRouter(); - // 1. set two system input operators (i.e. two window operators) - this.simpleRtr.addOperator(wnd1); - this.simpleRtr.addOperator(wnd2); - // 2. connect join operator to both window operators - this.simpleRtr.addOperator(join); - // 3. connect re-partition operator to the stream operator - this.simpleRtr.addOperator(par); - - this.simpleRtr.init(config, context); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index f1903c6..fde8ab5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -23,7 +23,7 @@ include \ 'samza-shell' def jdk8Modules = [ - 'samza-sql-core', + 'samza-operator', 'samza-sql-calcite' ] as HashSet
