http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java deleted file mode 100644 index 1e8f192..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.api.data; - -import java.util.Map; - - -public interface Schema { - - enum Type { - INTEGER, - LONG, - FLOAT, - DOUBLE, - BOOLEAN, - STRING, - BYTES, - STRUCT, - ARRAY, - MAP - }; - - Type getType(); - - Schema getElementType(); - - Schema getValueType(); - - Map<String, Schema> getFields(); - - Schema getFieldType(String fldName); - - Data read(Object object); - - Data transform(Data inputData); - - boolean equals(Schema other); -}
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java deleted file mode 100644 index 931705e..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.api.data; - -import java.util.List; - - -/** - * This interface defines an ordered {@link org.apache.samza.sql.api.data.Relation}, which has an ordered key. - * - * <p> This is to define a stream created by CREATE STREAM statement - * - * @param <K> The ordered key for the {@code Stream} class - */ -public interface Stream<K extends Comparable<?>> extends Relation<K> { - /** - * Get the list of field names used as the order keys for this stream - * - * @return The list of field names used to construct the order key for the stream - */ - List<String> getOrderFields(); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java deleted file mode 100644 index 7b4d984..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.api.data; - -/** - * This interface defines a non-ordered {@link org.apache.samza.sql.api.data.Relation}, which has a unique primary key - * - * <p> This is to define a table created by CREATE TABLE statement - * - * @param <K> The primary key for the {@code Table} class - */ -public interface Table<K> extends Relation<K> { - - /** - * Get the primary key field name for this table - * - * @return The name of the primary key field - */ - String getPrimaryKeyName(); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java deleted file mode 100644 index bea922b..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.api.data; - -import org.apache.samza.system.sql.Offset; - - -/** - * This class defines the generic interface of <code>Tuple</code>, which is a entry from the incoming stream, or one row in a <code>Relation</code>. - * - * <p>The <code>Tuple</code> models the basic operatible unit in streaming SQL processes in Samza. - * - */ -public interface Tuple { - - /** - * Access method to get the corresponding message body in the tuple - * - * @return Message object in the tuple - */ - Data getMessage(); - - /** - * Method to indicate whether the tuple is a delete tuple or an insert tuple - * - * @return A boolean value indicates whether the current tuple is a delete or insert message - */ - boolean isDelete(); - - /** - * Access method to the key of the tuple - * - * @return The <code>key</code> of the tuple - */ - Data getKey(); - - /** - * Get the stream name of the tuple. Note this stream name should be unique in the system. - * - * @return The stream name which this tuple belongs to - */ - EntityName getEntityName(); - - /** - * Get the message creation timestamp of the tuple. - * - * @return The tuple's creation timestamp in nano seconds. - */ - long getCreateTimeNano(); - - /** - * Get the offset of the tuple in the stream. This should be used to uniquely identify a tuple in a stream. - * - * @return The offset of the tuple in the stream. - */ - Offset getOffset(); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java deleted file mode 100644 index d6f6b57..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.api.operators; - -import org.apache.samza.config.Config; -import org.apache.samza.sql.api.data.Relation; -import org.apache.samza.sql.api.data.Tuple; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; - - -public interface Operator { - /** - * Method to initialize the operator - * - * @param config The configuration object - * @param context The task context - * @throws Exception Throws Exception if failed to initialize the operator - */ - void init(Config config, TaskContext context) throws Exception; - - /** - * Method to perform a relational logic on the input relation - * - * <p> The actual implementation of relational logic is performed by the implementation of this method. - * - * @param deltaRelation The changed rows in the input relation, including the inserts/deletes/updates - * @param collector The {@link org.apache.samza.task.MessageCollector} that accepts outputs from the operator - * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context - * @throws Exception Throws exception if failed - */ - void process(Relation deltaRelation, MessageCollector collector, TaskCoordinator coordinator) - throws Exception; - - /** - * Method to process on an input tuple. - * - * @param tuple The input tuple, which has the incoming message from a stream - * @param collector The {@link org.apache.samza.task.MessageCollector} that accepts outputs from the operator - * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context - * @throws Exception Throws exception if failed - */ - void process(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) throws Exception; - - /** - * Method to refresh the result when a timer expires - * - * @param timeNano The current system time in nano second - * @param collector The {@link org.apache.samza.task.MessageCollector} that accepts outputs from the operator - * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context - * @throws Exception Throws exception if failed - */ - void refresh(long timeNano, MessageCollector collector, TaskCoordinator coordinator) throws Exception; - -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java deleted file mode 100644 index fb2aa89..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.sql.api.operators; - -import org.apache.samza.sql.api.data.Relation; -import org.apache.samza.sql.api.data.Tuple; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; - - -/** - * Defines the callback functions to allow customized functions to be invoked before process and before sending the result - */ -public interface OperatorCallback { - /** - * Method to be invoked before the operator actually process the input tuple - * - * @param tuple The incoming tuple - * @param collector The {@link org.apache.samza.task.MessageCollector} in context - * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in context - * @return The tuple to be processed; return {@code null} if there is nothing to be processed - */ - Tuple beforeProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator); - - /** - * Method to be invoked before the operator actually process the input relation - * - * @param rel The input relation - * @param collector The {@link org.apache.samza.task.MessageCollector} in context - * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in context - * @return The relation to be processed; return {@code null} if there is nothing to be processed - */ - Relation beforeProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator); - - /** - * Method to be invoked before the operator's output tuple is sent - * - * @param tuple The output tuple - * @param collector The {@link org.apache.samza.task.MessageCollector} in context - * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in context - * @return The tuple to be sent; return {@code null} if there is nothing to be sent - */ - Tuple afterProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator); - - /** - * Method to be invoked before the operator's output relation is sent - * - * @param rel The output relation - * @param collector The {@link org.apache.samza.task.MessageCollector} in context - * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in context - * @return The relation to be sent; return {@code null} if there is nothing to be sent - */ - Relation afterProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator); -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java deleted file mode 100644 index 0759638..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.api.operators; - -import java.util.List; - -import org.apache.samza.sql.api.data.EntityName; - - -/** - * This interface class defines interface methods to connect {@link org.apache.samza.sql.api.operators.SimpleOperator}s together into a composite operator. - * - * <p>The {@code OperatorRouter} allows the user to attach operators to a {@link org.apache.samza.sql.api.data.Table} or - * a {@link org.apache.samza.sql.api.data.Stream} entity, if the corresponding table/stream is included as inputs to the operator. - * Each operator then executes its own logic and determines which table/stream to emit the output to. Through the {@code OperatorRouter}, - * the next operators attached to the corresponding output entities (i.e. table/streams) can then be invoked to continue the - * stream process task. - */ -public interface OperatorRouter extends Operator { - - /** - * This method adds a {@link org.apache.samza.sql.api.operators.SimpleOperator} to the {@code OperatorRouter}. - * - * @param nextOp The {@link org.apache.samza.sql.api.operators.SimpleOperator} to be added - * @throws Exception Throws exception if failed - */ - void addOperator(SimpleOperator nextOp) throws Exception; - - /** - * This method gets the list of {@link org.apache.samza.sql.api.operators.SimpleOperator}s attached to an output entity (of any type) - * - * @param output The identifier of the output entity - * @return The list of {@link org.apache.samza.sql.api.operators.SimpleOperator} taking {@code output} as input table/stream - */ - List<SimpleOperator> getNextOperators(EntityName output); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java deleted file mode 100644 index 4d670fd..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.api.operators; - -import java.util.List; - -import org.apache.samza.sql.api.data.EntityName; - - -/** - * This class defines a generic specification interface class for all {@link org.apache.samza.sql.api.operators.SimpleOperator}s. - * - * <p>The purpose of this class is to encapsulate all the details of configuration/parameters of a specific implementation of an operator. - * - * <p>The generic methods for an operator specification is to provide methods to get the unique ID, the list of entity names (i.e. stream name - * in {@link org.apache.samza.sql.api.data.Table} or {@link org.apache.samza.sql.api.data.Stream} name) of input variables , and the list of entity names of the output variables. - * - */ -public interface OperatorSpec { - /** - * Interface method that returns the unique ID of the operator in a task - * - * @return The unique ID of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object - */ - String getId(); - - /** - * Access method to the list of entity names of input variables. - * - * @return A list of entity names of the inputs - */ - List<EntityName> getInputNames(); - - /** - * Access method to the list of entity name of the output variable - * - * @return The entity name of the output - * - */ - List<EntityName> getOutputNames(); -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java deleted file mode 100644 index c49a822..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.api.operators; - - - -/** - * The interface for a {@code SimpleOperator} that implements a simple primitive relational logic operation - */ -public interface SimpleOperator extends Operator { - /** - * Method to get the specification of this {@code SimpleOperator} - * - * @return The {@link org.apache.samza.sql.api.operators.OperatorSpec} object that defines the configuration/parameters of the operator - */ - OperatorSpec getSpec(); -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java deleted file mode 100644 index 6f8d93b..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.api.operators; - - - -/** - * This class defines the interface of SQL operator factory, which creates the {@link org.apache.samza.sql.api.operators.SimpleOperator}s: - */ -public interface SqlOperatorFactory { - - /** - * Interface method to create/get the {@link org.apache.samza.sql.api.operators.SimpleOperator} object - * - * @param spec The specification of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object - * @return The {@link org.apache.samza.sql.api.operators.SimpleOperator} object - */ - SimpleOperator getOperator(OperatorSpec spec); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java deleted file mode 100644 index 72a59f2..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.sql.data; - -import org.apache.samza.sql.api.data.Data; -import org.apache.samza.sql.api.data.EntityName; -import org.apache.samza.sql.api.data.Tuple; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.sql.LongOffset; -import org.apache.samza.system.sql.Offset; - - -/** - * This class implements a {@link org.apache.samza.sql.api.data.Tuple} that encapsulates an {@link org.apache.samza.system.IncomingMessageEnvelope} from the system - * - */ -public class IncomingMessageTuple implements Tuple { - /** - * Incoming message envelope - */ - private final IncomingMessageEnvelope imsg; - - /** - * The entity name for the incoming system stream - */ - private final EntityName strmEntity; - - /** - * The receive time of this incoming message - */ - private final long recvTimeNano; - - /** - * Ctor to create a {@code IncomingMessageTuple} from {@link org.apache.samza.system.IncomingMessageEnvelope} - * - * @param imsg The incoming system message - */ - public IncomingMessageTuple(IncomingMessageEnvelope imsg) { - this.imsg = imsg; - this.strmEntity = - EntityName.getStreamName(String.format("%s:%s", imsg.getSystemStreamPartition().getSystem(), imsg - .getSystemStreamPartition().getStream())); - this.recvTimeNano = System.nanoTime(); - } - - @Override - public Data getMessage() { - return (Data) this.imsg.getMessage(); - } - - @Override - public boolean isDelete() { - return false; - } - - @Override - public Data getKey() { - return (Data) this.imsg.getKey(); - } - - @Override - public EntityName getEntityName() { - return this.strmEntity; - } - - @Override - public long getCreateTimeNano() { - // TODO: this is wrong and just to keep as an placeholder. It should be replaced by the message publish time when the publish timestamp is available in the message metadata - return this.recvTimeNano; - } - - @Override - public Offset getOffset() { - // TODO: need to add offset factory to generate different types of offset. This is just a placeholder, - // assuming incoming message carries long value as offset (i.e. Kafka case) - return new LongOffset(this.imsg.getOffset()); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java deleted file mode 100644 index d040be9..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.data.avro; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; - -import org.apache.avro.generic.GenericArray; -import org.apache.avro.generic.GenericRecord; -import org.apache.samza.sql.api.data.Data; -import org.apache.samza.sql.api.data.Schema; - - -public class AvroData implements Data { - protected final Object datum; - protected final AvroSchema schema; - - private AvroData(AvroSchema schema, Object datum) { - this.datum = datum; - this.schema = schema; - } - - @Override - public Schema schema() { - return this.schema; - } - - @Override - public Object value() { - return this.datum; - } - - @Override - public int intValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public long longValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public float floatValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public double doubleValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public boolean booleanValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public String strValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public byte[] bytesValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public List<Object> arrayValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public Map<Object, Object> mapValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public Data getElement(int index) { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public Data getFieldData(String fldName) { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - public static AvroData getArray(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.ARRAY) { - throw new IllegalArgumentException("Can't create an array object with non-array schema:" + schema.getType()); - } - return new AvroData(schema, datum) { - @SuppressWarnings("unchecked") - private final GenericArray<Object> array = (GenericArray<Object>) this.datum; - - @Override - public List<Object> arrayValue() { - return this.array; - } - - @Override - public Data getElement(int index) { - return this.schema.getElementType().read(array.get(index)); - } - - }; - } - - public static AvroData getMap(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.MAP) { - throw new IllegalArgumentException("Can't create a map object with non-map schema:" + schema.getType()); - } - return new AvroData(schema, datum) { - @SuppressWarnings("unchecked") - private final Map<Object, Object> map = (Map<Object, Object>) datum; - - @Override - public Map<Object, Object> mapValue() { - return this.map; - } - - @Override - public Data getFieldData(String fldName) { - // TODO Auto-generated method stub - return this.schema.getValueType().read(map.get(fldName)); - } - - }; - } - - public static AvroData getStruct(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.STRUCT) { - throw new IllegalArgumentException("Can't create a struct object with non-struct schema:" + schema.getType()); - } - return new AvroData(schema, datum) { - private final GenericRecord record = (GenericRecord) datum; - - @Override - public Data getFieldData(String fldName) { - // TODO Auto-generated method stub - return this.schema.getFieldType(fldName).read(record.get(fldName)); - } - - }; - } - - public static AvroData getInt(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.INTEGER || !(datum instanceof Integer)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public int intValue() { - return ((Integer) datum).intValue(); - } - - }; - } - - public static AvroData getLong(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.LONG || !(datum instanceof Long)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public long longValue() { - return ((Long) datum).longValue(); - } - - }; - } - - public static AvroData getFloat(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.FLOAT || !(datum instanceof Float)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public float floatValue() { - return ((Float) datum).floatValue(); - } - - }; - } - - public static AvroData getDouble(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.DOUBLE || !(datum instanceof Double)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public double doubleValue() { - return ((Double) datum).doubleValue(); - } - - }; - } - - public static AvroData getBoolean(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.BOOLEAN || !(datum instanceof Boolean)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public boolean booleanValue() { - return ((Boolean) datum).booleanValue(); - } - - }; - } - - public static AvroData getString(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.STRING || !(datum instanceof CharSequence)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public String strValue() { - return ((CharSequence) datum).toString(); - } - - }; - } - - public static AvroData getBytes(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.BYTES || !(datum instanceof ByteBuffer)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public byte[] bytesValue() { - return ((ByteBuffer) datum).array(); - } - - }; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java deleted file mode 100644 index 577cf74..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.data.avro; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.avro.Schema.Field; -import org.apache.samza.sql.api.data.Data; -import org.apache.samza.sql.api.data.Schema; - - -public class AvroSchema implements Schema { - - protected final org.apache.avro.Schema avroSchema; - protected final Schema.Type type; - - private final static Map<org.apache.avro.Schema.Type, AvroSchema> primSchemas = - new HashMap<org.apache.avro.Schema.Type, AvroSchema>(); - - static { - primSchemas.put(org.apache.avro.Schema.Type.INT, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) { - @Override - public Data read(Object datum) { - return AvroData.getInt(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.LONG, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)) { - @Override - public Data read(Object datum) { - return AvroData.getLong(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.FLOAT, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT)) { - @Override - public Data read(Object datum) { - return AvroData.getFloat(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.DOUBLE, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE)) { - @Override - public Data read(Object datum) { - return AvroData.getDouble(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.BOOLEAN, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN)) { - @Override - public Data read(Object datum) { - return AvroData.getBoolean(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.STRING, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) { - @Override - public Data read(Object datum) { - return AvroData.getString(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.BYTES, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) { - @Override - public Data read(Object datum) { - return AvroData.getBytes(this, datum); - } - }); - }; - - public static AvroSchema getSchema(final org.apache.avro.Schema schema) { - Schema.Type type = mapType(schema.getType()); - if (type != Schema.Type.ARRAY && type != Schema.Type.MAP && type != Schema.Type.STRUCT) { - return primSchemas.get(schema.getType()); - } - // otherwise, construct the new schema - // TODO: It would be possible to assign each complex schema an ID and cache it w/o repeated create in-memory schema objects - switch (type) { - case ARRAY: - return new AvroSchema(schema) { - @Override - public Data transform(Data input) { - // This would get all the elements until the length of the current schema's array length - if (input.schema().getType() != Schema.Type.ARRAY) { - throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " - + input.schema().getType()); - } - if (!input.schema().getElementType().equals(this.getElementType())) { - throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: " - + input.schema().getElementType().getType()); - } - // input type matches array type - return AvroData.getArray(this, input.value()); - } - }; - case MAP: - return new AvroSchema(schema) { - @Override - public Data transform(Data input) { - // This would get all the elements until the length of the current schema's array length - if (input.schema().getType() != Schema.Type.MAP) { - throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " - + input.schema().getType()); - } - if (!input.schema().getValueType().equals(this.getValueType())) { - throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: " - + input.schema().getValueType().getType()); - } - // input type matches map type - return AvroData.getMap(this, input.value()); - } - }; - case STRUCT: - return new AvroSchema(schema) { - @SuppressWarnings("serial") - private final Map<String, Schema> fldSchemas = new HashMap<String, Schema>() { - { - for (Field field : schema.getFields()) { - put(field.name(), getSchema(field.schema())); - } - } - }; - - @Override - public Map<String, Schema> getFields() { - return this.fldSchemas; - } - - @Override - public Schema getFieldType(String fldName) { - return this.fldSchemas.get(fldName); - } - - @Override - public Data transform(Data input) { - // This would get all the elements until the length of the current schema's array length - if (input.schema().getType() != Schema.Type.STRUCT) { - throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " - + input.schema().getType()); - } - // Note: this particular transform function only implements "projection to a sub-set" concept. - // More complex function is needed if some other concepts such as "merge from two sets of data", "allow null if does not exist" are needed - for (String fldName : this.fldSchemas.keySet()) { - // check each field schema matches input - Schema fldSchema = this.fldSchemas.get(fldName); - Schema inputFld = input.schema().getFieldType(fldName); - if (!fldSchema.equals(inputFld)) { - throw new IllegalArgumentException("Field schema mismatch. Can't transfer data for field " + fldName - + ". input field schema:" + inputFld.getType() + ", this field schema: " + fldSchema.getType()); - } - } - // input type matches struct type - return AvroData.getStruct(this, input.value()); - } - - }; - default: - throw new IllegalArgumentException("Un-recognized complext data type:" + type); - } - } - - private AvroSchema(org.apache.avro.Schema schema) { - this.avroSchema = schema; - this.type = mapType(schema.getType()); - } - - private static Type mapType(org.apache.avro.Schema.Type type) { - switch (type) { - case ARRAY: - return Schema.Type.ARRAY; - case RECORD: - return Schema.Type.STRUCT; - case MAP: - return Schema.Type.MAP; - case INT: - return Schema.Type.INTEGER; - case LONG: - return Schema.Type.LONG; - case BOOLEAN: - return Schema.Type.BOOLEAN; - case FLOAT: - return Schema.Type.FLOAT; - case DOUBLE: - return Schema.Type.DOUBLE; - case STRING: - return Schema.Type.STRING; - case BYTES: - return Schema.Type.BYTES; - default: - throw new IllegalArgumentException("Avro schema: " + type + " is not supported"); - } - } - - @Override - public Type getType() { - return this.type; - } - - @Override - public Schema getElementType() { - if (this.type != Schema.Type.ARRAY) { - throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type); - } - return getSchema(this.avroSchema.getElementType()); - } - - @Override - public Schema getValueType() { - if (this.type != Schema.Type.MAP) { - throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type); - } - return getSchema(this.avroSchema.getValueType()); - } - - @Override - public Map<String, Schema> getFields() { - throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type); - } - - @Override - public Schema getFieldType(String fldName) { - throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type); - } - - @Override - public Data read(Object object) { - if (this.avroSchema.getType() == org.apache.avro.Schema.Type.ARRAY) { - return AvroData.getArray(this, object); - } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.MAP) { - return AvroData.getMap(this, object); - } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.RECORD) { - return AvroData.getStruct(this, object); - } - throw new UnsupportedOperationException("Reading unknown complext type:" + this.type + " is not supported"); - } - - @Override - public Data transform(Data inputData) { - if (inputData.schema().getType() == Schema.Type.ARRAY || inputData.schema().getType() == Schema.Type.MAP - || inputData.schema().getType() == Schema.Type.STRUCT) { - throw new IllegalArgumentException("Complex schema should have overriden the default transform() function."); - } - if (inputData.schema().getType() != this.type) { - throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type - + ", input type:" + inputData.schema().getType()); - } - return inputData; - } - - @Override - public boolean equals(Schema other) { - // TODO Auto-generated method stub - if (this.type != other.getType()) { - return false; - } - switch (this.type) { - case ARRAY: - // check if element types are the same - return this.getElementType().equals(other.getElementType()); - case MAP: - // check if value types are the same - return this.getValueType().equals(other.getValueType()); - case STRUCT: - // check if the fields schemas in this equals the other - // NOTE: this equals check is in consistent with the "projection to subset" concept implemented in transform() - for (String fieldName : this.getFields().keySet()) { - if (!this.getFieldType(fieldName).equals(other.getFieldType(fieldName))) { - return false; - } - } - return true; - default: - return true; - } - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java deleted file mode 100644 index f3f7f7d..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.sql.data.serializers; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.Encoder; -import org.apache.avro.io.EncoderFactory; -import org.apache.samza.SamzaException; -import org.apache.samza.serializers.Serde; -import org.apache.samza.sql.data.avro.AvroData; -import org.apache.samza.sql.data.avro.AvroSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -public class SqlAvroSerde implements Serde<AvroData> { - private static Logger log = LoggerFactory.getLogger(SqlAvroSerde.class); - - private final Schema avroSchema; - private final GenericDatumReader<GenericRecord> reader; - private final GenericDatumWriter<Object> writer; - - public SqlAvroSerde(Schema avroSchema) { - this.avroSchema = avroSchema; - this.reader = new GenericDatumReader<GenericRecord>(avroSchema); - this.writer = new GenericDatumWriter<Object>(avroSchema); - } - - @Override - public AvroData fromBytes(byte[] bytes) { - GenericRecord data; - - try { - data = reader.read(null, DecoderFactory.get().binaryDecoder(bytes, null)); - return getAvroData(data, avroSchema); - } catch (IOException e) { - String errMsg = "Cannot decode message."; - log.error(errMsg, e); - throw new SamzaException(errMsg, e); - } - } - - @Override - public byte[] toBytes(AvroData object) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); - - try { - writer.write(object.value(), encoder); - encoder.flush(); - return out.toByteArray(); - } catch (IOException e) { - String errMsg = "Cannot perform Avro binary encode."; - log.error(errMsg, e); - throw new SamzaException(errMsg, e); - } - } - - private AvroData getAvroData(GenericRecord data, Schema type){ - AvroSchema schema = AvroSchema.getSchema(type); - switch (type.getType()){ - case RECORD: - return AvroData.getStruct(schema, data); - case ARRAY: - return AvroData.getArray(schema, data); - case MAP: - return AvroData.getMap(schema, data); - case INT: - return AvroData.getInt(schema, data); - case LONG: - return AvroData.getLong(schema, data); - case BOOLEAN: - return AvroData.getBoolean(schema, data); - case FLOAT: - return AvroData.getFloat(schema, data); - case DOUBLE: - return AvroData.getDouble(schema, data); - case STRING: - return AvroData.getString(schema, data); - case BYTES: - return AvroData.getBytes(schema, data); - default: - throw new IllegalArgumentException("Avro schema: " + type + " is not supported"); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java deleted file mode 100644 index aad18f4..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.sql.data.serializers; - -import org.apache.avro.Schema; -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.serializers.Serde; -import org.apache.samza.serializers.SerdeFactory; -import org.apache.samza.sql.data.avro.AvroData; - -public class SqlAvroSerdeFactory implements SerdeFactory<AvroData> { - public static final String PROP_AVRO_SCHEMA = "serializers.%s.schema"; - - @Override - public Serde<AvroData> getSerde(String name, Config config) { - String avroSchemaStr = config.get(String.format(PROP_AVRO_SCHEMA, name)); - if (avroSchemaStr == null || avroSchemaStr.isEmpty()) { - throw new SamzaException("Cannot find avro schema for SerdeFactory '" + name + "'."); - } - - return new SqlAvroSerde(new Schema.Parser().parse(avroSchemaStr)); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java deleted file mode 100644 index 1f0c3b2..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.data.serializers; - -import org.apache.samza.serializers.Serde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.sql.data.string.StringData; - -import java.io.UnsupportedEncodingException; - -public class SqlStringSerde implements Serde<StringData> { - - private final Serde<String> serde; - - public SqlStringSerde(String encoding) { - this.serde = new StringSerde(encoding); - } - - @Override - public StringData fromBytes(byte[] bytes) { - return new StringData(serde.fromBytes(bytes)); - } - - @Override - public byte[] toBytes(StringData object) { - return serde.toBytes(object.strValue()); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java deleted file mode 100644 index 2564479..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.data.serializers; - - -import org.apache.samza.config.Config; -import org.apache.samza.serializers.Serde; -import org.apache.samza.serializers.SerdeFactory; -import org.apache.samza.sql.data.string.StringData; - -public class SqlStringSerdeFactory implements SerdeFactory<StringData> { - @Override - public Serde<StringData> getSerde(String name, Config config) { - return new SqlStringSerde(config.get("encoding", "UTF-8")); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java deleted file mode 100644 index b81d9fa..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.data.string; - -import org.apache.samza.sql.api.data.Data; -import org.apache.samza.sql.api.data.Schema; - -import java.util.List; -import java.util.Map; - -public class StringData implements Data { - private final Object datum; - private final Schema schema; - - public StringData(Object datum) { - this.datum = datum; - this.schema = new StringSchema(); - } - - @Override - public Schema schema() { - return this.schema; - } - - @Override - public Object value() { - return this.datum; - } - - @Override - public int intValue() { - throw new UnsupportedOperationException("Can't get int value for a string type data"); - } - - @Override - public long longValue() { - throw new UnsupportedOperationException("Can't get long value for a string type data"); - } - - @Override - public float floatValue() { - throw new UnsupportedOperationException("Can't get float value for a string type data"); - } - - @Override - public double doubleValue() { - throw new UnsupportedOperationException("Can't get double value for a string type data"); - } - - @Override - public boolean booleanValue() { - throw new UnsupportedOperationException("Can't get boolean value for a string type data"); - } - - @Override - public String strValue() { - return String.valueOf(datum); - } - - @Override - public byte[] bytesValue() { - throw new UnsupportedOperationException("Can't get bytesValue for a string type data"); - } - - @Override - public List<Object> arrayValue() { - throw new UnsupportedOperationException("Can't get arrayValue for a string type data"); - } - - @Override - public Map<Object, Object> mapValue() { - throw new UnsupportedOperationException("Can't get mapValue for a string type data"); - } - - @Override - public Data getElement(int index) { - throw new UnsupportedOperationException("Can't getElement(index) on a string type data"); - } - - @Override - public Data getFieldData(String fldName) { - throw new UnsupportedOperationException("Can't getFieldData(fieldName) for a string type data"); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java deleted file mode 100644 index 348fc0c..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.data.string; - -import org.apache.samza.sql.api.data.Data; -import org.apache.samza.sql.api.data.Schema; - -import java.util.Map; - -public class StringSchema implements Schema { - private Type type = Type.STRING; - - @Override - public Type getType() { - return Type.STRING; - } - - @Override - public Schema getElementType() { - throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type); - } - - @Override - public Schema getValueType() { - throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type); - } - - @Override - public Map<String, Schema> getFields() { - throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type); - } - - @Override - public Schema getFieldType(String fldName) { - throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type); - } - - @Override - public Data read(Object object) { - return new StringData(object); - } - - @Override - public Data transform(Data inputData) { - if (inputData.schema().getType() != this.type) { - throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type - + ", input type:" + inputData.schema().getType()); - } - return inputData; - } - - @Override - public boolean equals(Schema other) { - return other.getType() == this.type; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java deleted file mode 100644 index c3d2266..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.sql.operators.factory; - -import org.apache.samza.sql.api.data.Relation; -import org.apache.samza.sql.api.data.Tuple; -import org.apache.samza.sql.api.operators.OperatorCallback; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; - - -public final class NoopOperatorCallback implements OperatorCallback { - - @Override - public Tuple beforeProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) { - return tuple; - } - - @Override - public Relation beforeProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) { - return rel; - } - - @Override - public Tuple afterProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) { - return tuple; - } - - @Override - public Relation afterProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) { - return rel; - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java deleted file mode 100644 index cbc84d0..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.operators.factory; - -import org.apache.samza.sql.api.operators.OperatorSpec; -import org.apache.samza.sql.api.operators.SimpleOperator; -import org.apache.samza.sql.api.operators.SqlOperatorFactory; -import org.apache.samza.sql.operators.join.StreamStreamJoin; -import org.apache.samza.sql.operators.join.StreamStreamJoinSpec; -import org.apache.samza.sql.operators.partition.PartitionOp; -import org.apache.samza.sql.operators.partition.PartitionSpec; -import org.apache.samza.sql.operators.window.BoundedTimeWindow; -import org.apache.samza.sql.operators.window.WindowSpec; - - -/** - * This simple factory class provides method to create the build-in operators per operator specification. - * It can be extended when the build-in operators expand. - * - */ -public class SimpleOperatorFactoryImpl implements SqlOperatorFactory { - - @Override - public SimpleOperator getOperator(OperatorSpec spec) { - if (spec instanceof PartitionSpec) { - return new PartitionOp((PartitionSpec) spec); - } else if (spec instanceof StreamStreamJoinSpec) { - return new StreamStreamJoin((StreamStreamJoinSpec) spec); - } else if (spec instanceof WindowSpec) { - return new BoundedTimeWindow((WindowSpec) spec); - } - throw new UnsupportedOperationException("Unsupported operator specified: " + spec.getClass().getCanonicalName()); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java deleted file mode 100644 index e66451f..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.operators.factory; - -import org.apache.samza.sql.api.data.Relation; -import org.apache.samza.sql.api.data.Tuple; -import org.apache.samza.sql.api.operators.OperatorCallback; -import org.apache.samza.sql.api.operators.OperatorSpec; -import org.apache.samza.sql.api.operators.SimpleOperator; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; -import org.apache.samza.task.sql.SimpleMessageCollector; - - -/** - * An abstract class that encapsulate the basic information and methods that all operator classes should implement. - * It implements the interface {@link org.apache.samza.sql.api.operators.SimpleOperator} - * - */ -public abstract class SimpleOperatorImpl implements SimpleOperator { - /** - * The specification of this operator - */ - private final OperatorSpec spec; - - /** - * The callback function - */ - private final OperatorCallback callback; - - /** - * Ctor of {@code SimpleOperatorImpl} class - * - * @param spec The specification of this operator - */ - public SimpleOperatorImpl(OperatorSpec spec) { - this(spec, new NoopOperatorCallback()); - } - - public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback callback) { - this.spec = spec; - this.callback = callback; - } - - @Override - public OperatorSpec getSpec() { - return this.spec; - } - - /** - * This method is made final s.t. the sequence of invocations between {@link org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relation, MessageCollector, TaskCoordinator)} - * and real processing of the input relation is fixed. - */ - @Override - final public void process(Relation deltaRelation, MessageCollector collector, TaskCoordinator coordinator) - throws Exception { - Relation rel = this.callback.beforeProcess(deltaRelation, collector, coordinator); - if (rel == null) { - return; - } - this.realProcess(rel, getCollector(collector, coordinator), coordinator); - } - - /** - * This method is made final s.t. the sequence of invocations between {@link org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple, MessageCollector, TaskCoordinator)} - * and real processing of the input tuple is fixed. - */ - @Override - final public void process(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) throws Exception { - Tuple ituple = this.callback.beforeProcess(tuple, collector, coordinator); - if (ituple == null) { - return; - } - this.realProcess(ituple, getCollector(collector, coordinator), coordinator); - } - - /** - * This method is made final s.t. we enforce the invocation of {@code SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)} before doing anything futher - */ - @Override - final public void refresh(long timeNano, MessageCollector collector, TaskCoordinator coordinator) throws Exception { - this.realRefresh(timeNano, getCollector(collector, coordinator), coordinator); - } - - private SimpleMessageCollector getCollector(MessageCollector collector, TaskCoordinator coordinator) { - if (!(collector instanceof SimpleMessageCollector)) { - return new SimpleMessageCollector(collector, coordinator, this.callback); - } else { - ((SimpleMessageCollector) collector).switchOperatorCallback(this.callback); - return (SimpleMessageCollector) collector; - } - } - - /** - * Method to be overriden by each specific implementation class of operator to handle timeout event - * - * @param timeNano The time in nanosecond when the timeout event occurred - * @param collector The {@link org.apache.samza.task.sql.SimpleMessageCollector} in the context - * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context - * @throws Exception Throws exception if failed to refresh the results - */ - protected abstract void realRefresh(long timeNano, SimpleMessageCollector collector, TaskCoordinator coordinator) - throws Exception; - - /** - * Method to be overriden by each specific implementation class of operator to perform relational logic operation on an input {@link org.apache.samza.sql.api.data.Relation} - * - * @param rel The input relation - * @param collector The {@link org.apache.samza.task.sql.SimpleMessageCollector} in the context - * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context - * @throws Exception - */ - protected abstract void realProcess(Relation rel, SimpleMessageCollector collector, TaskCoordinator coordinator) - throws Exception; - - protected abstract void realProcess(Tuple ituple, SimpleMessageCollector collector, TaskCoordinator coordinator) - throws Exception; - -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java deleted file mode 100644 index 56753b6..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.sql.operators.factory; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.samza.sql.api.data.EntityName; -import org.apache.samza.sql.api.operators.OperatorSpec; - - -/** - * An abstract class that encapsulate the basic information and methods that all specification of operators should implement. - * It implements {@link org.apache.samza.sql.api.operators.OperatorSpec} - */ -public abstract class SimpleOperatorSpec implements OperatorSpec { - /** - * The identifier of the corresponding operator - */ - private final String id; - - /** - * The list of input entity names of the corresponding operator - */ - private final List<EntityName> inputs = new ArrayList<EntityName>(); - - /** - * The list of output entity names of the corresponding operator - */ - private final List<EntityName> outputs = new ArrayList<EntityName>(); - - /** - * Ctor of the {@code SimpleOperatorSpec} for simple {@link org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and one output - * - * @param id Unique identifier of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object - * @param input The only input entity - * @param output The only output entity - */ - public SimpleOperatorSpec(String id, EntityName input, EntityName output) { - this.id = id; - this.inputs.add(input); - this.outputs.add(output); - } - - /** - * Ctor of {@code SimpleOperatorSpec} with general format: m inputs and n outputs - * - * @param id Unique identifier of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object - * @param inputs The list of input entities - * @param output The list of output entities - */ - public SimpleOperatorSpec(String id, List<EntityName> inputs, EntityName output) { - this.id = id; - this.inputs.addAll(inputs); - this.outputs.add(output); - } - - @Override - public String getId() { - return this.id; - } - - @Override - public List<EntityName> getInputNames() { - return this.inputs; - } - - @Override - public List<EntityName> getOutputNames() { - return this.outputs; - } - - /** - * Method to get the first output entity - * - * @return The first output entity name - */ - public EntityName getOutputName() { - return this.outputs.get(0); - } - - /** - * Method to get the first input entity - * - * @return The first input entity name - */ - public EntityName getInputName() { - return this.inputs.get(0); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java ---------------------------------------------------------------------- diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java deleted file mode 100644 index e570897..0000000 --- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.operators.factory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.samza.config.Config; -import org.apache.samza.sql.api.data.EntityName; -import org.apache.samza.sql.api.data.Relation; -import org.apache.samza.sql.api.data.Tuple; -import org.apache.samza.sql.api.operators.Operator; -import org.apache.samza.sql.api.operators.OperatorRouter; -import org.apache.samza.sql.api.operators.SimpleOperator; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; -import org.apache.samza.task.sql.RouterMessageCollector; - - -/** - * Example implementation of {@link org.apache.samza.sql.api.operators.OperatorRouter} - * - */ -public final class SimpleRouter implements OperatorRouter { - /** - * List of operators added to the {@link org.apache.samza.sql.api.operators.OperatorRouter} - */ - private List<SimpleOperator> operators = new ArrayList<SimpleOperator>(); - - @SuppressWarnings("rawtypes") - /** - * Map of {@link org.apache.samza.sql.api.data.EntityName} to the list of operators associated with it - */ - private Map<EntityName, List> nextOps = new HashMap<EntityName, List>(); - - /** - * Set of {@link org.apache.samza.sql.api.data.EntityName} as inputs to this {@code SimpleRouter} - */ - private Set<EntityName> inputEntities = new HashSet<EntityName>(); - - /** - * Set of entities that are not input entities to this {@code SimpleRouter} - */ - private Set<EntityName> outputEntities = new HashSet<EntityName>(); - - @SuppressWarnings("unchecked") - private void addOperator(EntityName input, SimpleOperator nextOp) { - if (nextOps.get(input) == null) { - nextOps.put(input, new ArrayList<Operator>()); - } - nextOps.get(input).add(nextOp); - operators.add(nextOp); - // get the operator spec - for (EntityName output : nextOp.getSpec().getOutputNames()) { - if (inputEntities.contains(output)) { - inputEntities.remove(output); - } - outputEntities.add(output); - } - if (!outputEntities.contains(input)) { - inputEntities.add(input); - } - } - - @Override - @SuppressWarnings("unchecked") - public List<SimpleOperator> getNextOperators(EntityName entity) { - return nextOps.get(entity); - } - - @Override - public void addOperator(SimpleOperator nextOp) { - List<EntityName> inputs = nextOp.getSpec().getInputNames(); - for (EntityName input : inputs) { - addOperator(input, nextOp); - } - } - - @Override - public void init(Config config, TaskContext context) throws Exception { - for (SimpleOperator op : this.operators) { - op.init(config, context); - } - } - - @Override - public void process(Tuple ituple, MessageCollector collector, TaskCoordinator coordinator) throws Exception { - MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this); - for (Iterator<SimpleOperator> iter = this.getNextOperators(ituple.getEntityName()).iterator(); iter.hasNext();) { - iter.next().process(ituple, opCollector, coordinator); - } - } - - @SuppressWarnings("rawtypes") - @Override - public void process(Relation deltaRelation, MessageCollector collector, TaskCoordinator coordinator) throws Exception { - MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this); - for (Iterator<SimpleOperator> iter = this.getNextOperators(deltaRelation.getName()).iterator(); iter.hasNext();) { - iter.next().process(deltaRelation, opCollector, coordinator); - } - } - - @Override - public void refresh(long nanoSec, MessageCollector collector, TaskCoordinator coordinator) throws Exception { - MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this); - for (EntityName entity : inputEntities) { - for (Iterator<SimpleOperator> iter = this.getNextOperators(entity).iterator(); iter.hasNext();) { - iter.next().refresh(nanoSec, opCollector, coordinator); - } - } - } - -}
