http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java new file mode 100644 index 0000000..e202c20 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.api.internal; + +import org.apache.samza.operators.api.data.Message; + + +/** + * This class defines the specific type of output messages from a {@link Operators.WindowOperator} function + * + * @param <K> the type of key in the output window result + * @param <M> the type of value in the output window result + */ +public final class WindowOutput<K, M> implements Message<K, M> { + private final K key; + private final M value; + + WindowOutput(K key, M aggregated) { + this.key = key; + this.value = aggregated; + } + + @Override public M getMessage() { + return this.value; + } + + @Override public K getKey() { + return this.key; + } + + @Override public long getTimestamp() { + return 0; + } + + static public <K, M> WindowOutput<K, M> of(K key, M result) { + return new WindowOutput<>(key, result); + } +} +
http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java new file mode 100644 index 0000000..49cfdeb --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.api.MessageStream; +import org.apache.samza.operators.api.data.Message; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; + + +/** + * Implementation class for a chain of operators from the single input {@code source} + * + * @param <M> type of message in the input stream {@code source} + */ +public class ChainedOperators<M extends Message> { + + /** + * Private constructor + * + * @param source the input source {@link MessageStream} + * @param context the {@link TaskContext} object that we need to instantiate the state stores + */ + private ChainedOperators(MessageStream<M> source, TaskContext context) { + // create the pipeline/topology starting from source + // pass in the context s.t. stateful stream operators can initialize their stores + } + + /** + * Static method to create a {@link ChainedOperators} from the {@code source} stream + * + * @param source the input source {@link MessageStream} + * @param context the {@link TaskContext} object used to initialize the {@link StateStoreImpl} + * @param <M> the type of input {@link Message} + * @return a {@link ChainedOperators} object takes the {@code source} as input + */ + public static <M extends Message> ChainedOperators create(MessageStream<M> source, TaskContext context) { + return new ChainedOperators<>(source, context); + } + + /** + * Method to navigate the incoming {@code message} through the processing chains + * + * @param message the incoming message to this {@link ChainedOperators} + * @param collector the {@link MessageCollector} object within the process context + * @param coordinator the {@link TaskCoordinator} object within the process context + */ + public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + // TODO: add implementation of onNext() that actually triggers the process pipeline + } + + public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + // TODO: add implementation of onTimer() that actually calls the corresponding window operator's onTimer() methods + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java new file mode 100644 index 0000000..81a7ede --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.api.data.Message; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.reactivestreams.Processor; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.HashSet; +import java.util.Set; + + +/** + * Abstract base class for all stream operator implementation classes. + */ +public abstract class OperatorImpl<M extends Message, RM extends Message> + implements Processor<ProcessorContext<M>, ProcessorContext<RM>> { + + private final Set<Subscriber<? super ProcessorContext<RM>>> subscribers = new HashSet<>(); + + @Override public void subscribe(Subscriber<? super ProcessorContext<RM>> s) { + // Only add once + subscribers.add(s); + } + + @Override public void onSubscribe(Subscription s) { + + } + + @Override public void onNext(ProcessorContext<M> o) { + + onNext(o.getMessage(), o.getCollector(), o.getCoordinator()); + } + + @Override public void onError(Throwable t) { + + } + + @Override public void onComplete() { + + } + + /** + * Each sub-class will implement this method to actually perform the transformation and call the downstream subscribers. + * + * @param message the input {@link Message} + * @param collector the {@link MessageCollector} in the context + * @param coordinator the {@link TaskCoordinator} in the context + */ + protected abstract void onNext(M message, MessageCollector collector, TaskCoordinator coordinator); + + /** + * Stateful operators will need to override this method to initialize the operators + * + * @param context the task context to initialize the operators within + */ + protected void init(TaskContext context) {}; + + /** + * Method to trigger all downstream operators that consumes the output {@link org.apache.samza.operators.api.MessageStream} + * from this operator + * + * @param omsg output {@link Message} + * @param collector the {@link MessageCollector} in the context + * @param coordinator the {@link TaskCoordinator} in the context + */ + protected void nextProcessors(RM omsg, MessageCollector collector, TaskCoordinator coordinator) { + subscribers.forEach(sub -> + sub.onNext(new ProcessorContext<>(omsg, collector, coordinator)) + ); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java new file mode 100644 index 0000000..5a375bc --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.api.data.Message; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + + +/** + * Wrapper class to be used by {@link OperatorImpl} + * + * @param <M> Type of input stream {@link Message} + */ +public class ProcessorContext<M extends Message> { + private final M message; + private final MessageCollector collector; + private final TaskCoordinator coordinator; + + ProcessorContext(M message, MessageCollector collector, TaskCoordinator coordinator) { + this.message = message; + this.collector = collector; + this.coordinator = coordinator; + } + + M getMessage() { + return this.message; + } + + MessageCollector getCollector() { + return this.collector; + } + + TaskCoordinator getCoordinator() { + return this.coordinator; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java new file mode 100644 index 0000000..b29d9c8 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.api.data.Message; +import org.apache.samza.operators.api.internal.Operators.StreamOperator; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + +import java.util.Collection; +import java.util.function.Function; + + +/** + * Base class for all implementation of operators + * + * @param <M> type of message in the input stream + * @param <RM> type of message in the output stream + */ +public class SimpleOperatorImpl<M extends Message, RM extends Message> extends OperatorImpl<M, RM> { + + private final Function<M, Collection<RM>> transformFn; + + SimpleOperatorImpl(StreamOperator<M, RM> op) { + super(); + this.transformFn = op.getFunction(); + } + + @Override protected void onNext(M imsg, MessageCollector collector, TaskCoordinator coordinator) { + // actually calling the transform function and then for each output, call nextProcessors() + this.transformFn.apply(imsg).forEach(r -> this.nextProcessors(r, collector, coordinator)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java new file mode 100644 index 0000000..5d25cfa --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.api.internal.Operators.SinkOperator; +import org.apache.samza.operators.api.MessageStream; +import org.apache.samza.operators.api.data.Message; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + + +/** + * Implementation for {@link SinkOperator} + */ +public class SinkOperatorImpl<M extends Message> extends OperatorImpl<M, Message> { + private final MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sinkFunc; + + SinkOperatorImpl(SinkOperator<M> sinkOp) { + this.sinkFunc = sinkOp.getFunction(); + } + + @Override protected void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + this.sinkFunc.apply(message, collector, coordinator); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java new file mode 100644 index 0000000..f573fd0 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.api.data.Message; +import org.apache.samza.operators.api.internal.Operators.StoreFunctions; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.task.TaskContext; + + +/** + * The base class for all state stores + */ +public class StateStoreImpl<M extends Message, SK, SS> { + private final String storeName; + private final StoreFunctions<M, SK, SS> storeFunctions; + private KeyValueStore<SK, SS> kvStore = null; + + public StateStoreImpl(StoreFunctions<M, SK, SS> store, String storeName) { + this.storeFunctions = store; + this.storeName = storeName; + } + + public void init(TaskContext context) { + this.kvStore = (KeyValueStore<SK, SS>) context.getStore(this.storeName); + } + + public Entry<SK, SS> getState(M m) { + SK key = this.storeFunctions.getStoreKeyFinder().apply(m); + SS state = this.kvStore.get(key); + return new Entry<>(key, state); + } + + public Entry<SK, SS> updateState(M m, Entry<SK, SS> oldEntry) { + SS newValue = this.storeFunctions.getStateUpdater().apply(m, oldEntry.getValue()); + this.kvStore.put(oldEntry.getKey(), newValue); + return new Entry<>(oldEntry.getKey(), newValue); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java new file mode 100644 index 0000000..e4f5d79 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl.data.avro; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import org.apache.avro.generic.GenericArray; +import org.apache.avro.generic.GenericRecord; +import org.apache.samza.operators.api.data.Data; +import org.apache.samza.operators.api.data.Schema; + + +public class AvroData implements Data { + protected final Object datum; + protected final AvroSchema schema; + + private AvroData(AvroSchema schema, Object datum) { + this.datum = datum; + this.schema = schema; + } + + @Override + public Schema schema() { + return this.schema; + } + + @Override + public Object value() { + return this.datum; + } + + @Override + public int intValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public long longValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public float floatValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public double doubleValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public boolean booleanValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public String strValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public byte[] bytesValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public List<Object> arrayValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public Map<Object, Object> mapValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public Data getElement(int index) { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public Data getFieldData(String fldName) { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + public static AvroData getArray(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.ARRAY) { + throw new IllegalArgumentException("Can't create an array object with non-array schema:" + schema.getType()); + } + return new AvroData(schema, datum) { + @SuppressWarnings("unchecked") + private final GenericArray<Object> array = (GenericArray<Object>) this.datum; + + @Override + public List<Object> arrayValue() { + return this.array; + } + + @Override + public Data getElement(int index) { + return this.schema.getElementType().read(array.get(index)); + } + + }; + } + + public static AvroData getMap(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.MAP) { + throw new IllegalArgumentException("Can't create a map object with non-map schema:" + schema.getType()); + } + return new AvroData(schema, datum) { + @SuppressWarnings("unchecked") + private final Map<Object, Object> map = (Map<Object, Object>) datum; + + @Override + public Map<Object, Object> mapValue() { + return this.map; + } + + @Override + public Data getFieldData(String fldName) { + // TODO Auto-generated method stub + return this.schema.getValueType().read(map.get(fldName)); + } + + }; + } + + public static AvroData getStruct(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.STRUCT) { + throw new IllegalArgumentException("Can't create a struct object with non-struct schema:" + schema.getType()); + } + return new AvroData(schema, datum) { + private final GenericRecord record = (GenericRecord) datum; + + @Override + public Data getFieldData(String fldName) { + // TODO Auto-generated method stub + return this.schema.getFieldType(fldName).read(record.get(fldName)); + } + + }; + } + + public static AvroData getInt(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.INTEGER || !(datum instanceof Integer)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public int intValue() { + return ((Integer) datum).intValue(); + } + + }; + } + + public static AvroData getLong(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.LONG || !(datum instanceof Long)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public long longValue() { + return ((Long) datum).longValue(); + } + + }; + } + + public static AvroData getFloat(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.FLOAT || !(datum instanceof Float)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public float floatValue() { + return ((Float) datum).floatValue(); + } + + }; + } + + public static AvroData getDouble(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.DOUBLE || !(datum instanceof Double)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public double doubleValue() { + return ((Double) datum).doubleValue(); + } + + }; + } + + public static AvroData getBoolean(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.BOOLEAN || !(datum instanceof Boolean)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public boolean booleanValue() { + return ((Boolean) datum).booleanValue(); + } + + }; + } + + public static AvroData getString(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.STRING || !(datum instanceof CharSequence)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public String strValue() { + return ((CharSequence) datum).toString(); + } + + }; + } + + public static AvroData getBytes(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.BYTES || !(datum instanceof ByteBuffer)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public byte[] bytesValue() { + return ((ByteBuffer) datum).array(); + } + + }; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java new file mode 100644 index 0000000..c04e4f6 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl.data.avro; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.avro.Schema.Field; +import org.apache.samza.operators.api.data.Data; +import org.apache.samza.operators.api.data.Schema; + + +public class AvroSchema implements Schema { + + protected final org.apache.avro.Schema avroSchema; + protected final Schema.Type type; + + private final static Map<org.apache.avro.Schema.Type, AvroSchema> primSchemas = + new HashMap<org.apache.avro.Schema.Type, AvroSchema>(); + + static { + primSchemas.put(org.apache.avro.Schema.Type.INT, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) { + @Override + public Data read(Object datum) { + return AvroData.getInt(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.LONG, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)) { + @Override + public Data read(Object datum) { + return AvroData.getLong(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.FLOAT, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT)) { + @Override + public Data read(Object datum) { + return AvroData.getFloat(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.DOUBLE, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE)) { + @Override + public Data read(Object datum) { + return AvroData.getDouble(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.BOOLEAN, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN)) { + @Override + public Data read(Object datum) { + return AvroData.getBoolean(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.STRING, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) { + @Override + public Data read(Object datum) { + return AvroData.getString(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.BYTES, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) { + @Override + public Data read(Object datum) { + return AvroData.getBytes(this, datum); + } + }); + }; + + public static AvroSchema getSchema(final org.apache.avro.Schema schema) { + Schema.Type type = mapType(schema.getType()); + if (type != Schema.Type.ARRAY && type != Schema.Type.MAP && type != Schema.Type.STRUCT) { + return primSchemas.get(schema.getType()); + } + // otherwise, construct the new schema + // TODO: It would be possible to assign each complex schema an ID and cache it w/o repeated create in-memory schema objects + switch (type) { + case ARRAY: + return new AvroSchema(schema) { + @Override + public Data transform(Data input) { + // This would get all the elements until the length of the current schema's array length + if (input.schema().getType() != Schema.Type.ARRAY) { + throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " + + input.schema().getType()); + } + if (!input.schema().getElementType().equals(this.getElementType())) { + throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: " + + input.schema().getElementType().getType()); + } + // input type matches array type + return AvroData.getArray(this, input.value()); + } + }; + case MAP: + return new AvroSchema(schema) { + @Override + public Data transform(Data input) { + // This would get all the elements until the length of the current schema's array length + if (input.schema().getType() != Schema.Type.MAP) { + throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " + + input.schema().getType()); + } + if (!input.schema().getValueType().equals(this.getValueType())) { + throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: " + + input.schema().getValueType().getType()); + } + // input type matches map type + return AvroData.getMap(this, input.value()); + } + }; + case STRUCT: + return new AvroSchema(schema) { + @SuppressWarnings("serial") + private final Map<String, Schema> fldSchemas = new HashMap<String, Schema>() { + { + for (Field field : schema.getFields()) { + put(field.name(), getSchema(field.schema())); + } + } + }; + + @Override + public Map<String, Schema> getFields() { + return this.fldSchemas; + } + + @Override + public Schema getFieldType(String fldName) { + return this.fldSchemas.get(fldName); + } + + @Override + public Data transform(Data input) { + // This would get all the elements until the length of the current schema's array length + if (input.schema().getType() != Schema.Type.STRUCT) { + throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " + + input.schema().getType()); + } + // Note: this particular transform function only implements "projection to a sub-set" concept. + // More complex function is needed if some other concepts such as "merge from two sets of data", "allow null if does not exist" are needed + for (String fldName : this.fldSchemas.keySet()) { + // check each field schema matches input + Schema fldSchema = this.fldSchemas.get(fldName); + Schema inputFld = input.schema().getFieldType(fldName); + if (!fldSchema.equals(inputFld)) { + throw new IllegalArgumentException("Field schema mismatch. Can't transfer data for field " + fldName + + ". input field schema:" + inputFld.getType() + ", this field schema: " + fldSchema.getType()); + } + } + // input type matches struct type + return AvroData.getStruct(this, input.value()); + } + + }; + default: + throw new IllegalArgumentException("Un-recognized complext data type:" + type); + } + } + + private AvroSchema(org.apache.avro.Schema schema) { + this.avroSchema = schema; + this.type = mapType(schema.getType()); + } + + private static Type mapType(org.apache.avro.Schema.Type type) { + switch (type) { + case ARRAY: + return Schema.Type.ARRAY; + case RECORD: + return Schema.Type.STRUCT; + case MAP: + return Schema.Type.MAP; + case INT: + return Schema.Type.INTEGER; + case LONG: + return Schema.Type.LONG; + case BOOLEAN: + return Schema.Type.BOOLEAN; + case FLOAT: + return Schema.Type.FLOAT; + case DOUBLE: + return Schema.Type.DOUBLE; + case STRING: + return Schema.Type.STRING; + case BYTES: + return Schema.Type.BYTES; + default: + throw new IllegalArgumentException("Avro schema: " + type + " is not supported"); + } + } + + @Override + public Type getType() { + return this.type; + } + + @Override + public Schema getElementType() { + if (this.type != Schema.Type.ARRAY) { + throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type); + } + return getSchema(this.avroSchema.getElementType()); + } + + @Override + public Schema getValueType() { + if (this.type != Schema.Type.MAP) { + throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type); + } + return getSchema(this.avroSchema.getValueType()); + } + + @Override + public Map<String, Schema> getFields() { + throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type); + } + + @Override + public Schema getFieldType(String fldName) { + throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type); + } + + @Override + public Data read(Object object) { + if (this.avroSchema.getType() == org.apache.avro.Schema.Type.ARRAY) { + return AvroData.getArray(this, object); + } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.MAP) { + return AvroData.getMap(this, object); + } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.RECORD) { + return AvroData.getStruct(this, object); + } + throw new UnsupportedOperationException("Reading unknown complext type:" + this.type + " is not supported"); + } + + @Override + public Data transform(Data inputData) { + if (inputData.schema().getType() == Schema.Type.ARRAY || inputData.schema().getType() == Schema.Type.MAP + || inputData.schema().getType() == Schema.Type.STRUCT) { + throw new IllegalArgumentException("Complex schema should have overriden the default transform() function."); + } + if (inputData.schema().getType() != this.type) { + throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type + + ", input type:" + inputData.schema().getType()); + } + return inputData; + } + + @Override + public boolean equals(Schema other) { + // TODO Auto-generated method stub + if (this.type != other.getType()) { + return false; + } + switch (this.type) { + case ARRAY: + // check if element types are the same + return this.getElementType().equals(other.getElementType()); + case MAP: + // check if value types are the same + return this.getValueType().equals(other.getValueType()); + case STRUCT: + // check if the fields schemas in this equals the other + // NOTE: this equals check is in consistent with the "projection to subset" concept implemented in transform() + for (String fieldName : this.getFields().keySet()) { + if (!this.getFieldType(fieldName).equals(other.getFieldType(fieldName))) { + return false; + } + } + return true; + default: + return true; + } + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java new file mode 100644 index 0000000..2432aca --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl.data.serializers; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.samza.SamzaException; +import org.apache.samza.serializers.Serde; +import org.apache.samza.operators.impl.data.avro.AvroData; +import org.apache.samza.operators.impl.data.avro.AvroSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +public class SqlAvroSerde implements Serde<AvroData> { + private static Logger log = LoggerFactory.getLogger(SqlAvroSerde.class); + + private final Schema avroSchema; + private final GenericDatumReader<GenericRecord> reader; + private final GenericDatumWriter<Object> writer; + + public SqlAvroSerde(Schema avroSchema) { + this.avroSchema = avroSchema; + this.reader = new GenericDatumReader<GenericRecord>(avroSchema); + this.writer = new GenericDatumWriter<Object>(avroSchema); + } + + @Override + public AvroData fromBytes(byte[] bytes) { + GenericRecord data; + + try { + data = reader.read(null, DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null)); + return getAvroData(data, avroSchema); + } catch (IOException e) { + String errMsg = "Cannot decode message."; + log.error(errMsg, e); + throw new SamzaException(errMsg, e); + } + } + + @Override + public byte[] toBytes(AvroData object) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Encoder encoder = new BinaryEncoder(out); + + try { + writer.write(object.value(), encoder); + encoder.flush(); + return out.toByteArray(); + } catch (IOException e) { + String errMsg = "Cannot perform Avro binary encode."; + log.error(errMsg, e); + throw new SamzaException(errMsg, e); + } + } + + private AvroData getAvroData(GenericRecord data, Schema type){ + AvroSchema schema = AvroSchema.getSchema(type); + switch (type.getType()){ + case RECORD: + return AvroData.getStruct(schema, data); + case ARRAY: + return AvroData.getArray(schema, data); + case MAP: + return AvroData.getMap(schema, data); + case INT: + return AvroData.getInt(schema, data); + case LONG: + return AvroData.getLong(schema, data); + case BOOLEAN: + return AvroData.getBoolean(schema, data); + case FLOAT: + return AvroData.getFloat(schema, data); + case DOUBLE: + return AvroData.getDouble(schema, data); + case STRING: + return AvroData.getString(schema, data); + case BYTES: + return AvroData.getBytes(schema, data); + default: + throw new IllegalArgumentException("Avro schema: " + type + " is not supported"); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java new file mode 100644 index 0000000..edd8859 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl.data.serializers; + +import org.apache.avro.Schema; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerdeFactory; +import org.apache.samza.operators.impl.data.avro.AvroData; + +public class SqlAvroSerdeFactory implements SerdeFactory<AvroData> { + public static final String PROP_AVRO_SCHEMA = "serializers.%s.schema"; + + @Override + public Serde<AvroData> getSerde(String name, Config config) { + String avroSchemaStr = config.get(String.format(PROP_AVRO_SCHEMA, name)); + if (avroSchemaStr == null || avroSchemaStr.isEmpty()) { + throw new SamzaException("Cannot find avro schema for SerdeFactory '" + name + "'."); + } + + return new SqlAvroSerde(Schema.parse(avroSchemaStr)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java new file mode 100644 index 0000000..1267ab6 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl.data.serializers; + +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.operators.impl.data.string.StringData; + + +public class SqlStringSerde implements Serde<StringData> { + + private final Serde<String> serde; + + public SqlStringSerde(String encoding) { + this.serde = new StringSerde(encoding); + } + + @Override + public StringData fromBytes(byte[] bytes) { + return new StringData(serde.fromBytes(bytes)); + } + + @Override + public byte[] toBytes(StringData object) { + return serde.toBytes(object.strValue()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java new file mode 100644 index 0000000..3b6a3e0 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl.data.serializers; + + +import org.apache.samza.config.Config; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerdeFactory; +import org.apache.samza.operators.impl.data.string.StringData; + +public class SqlStringSerdeFactory implements SerdeFactory<StringData> { + @Override + public Serde<StringData> getSerde(String name, Config config) { + return new SqlStringSerde(config.get("encoding", "UTF-8")); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java new file mode 100644 index 0000000..86e9917 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl.data.string; + +import org.apache.samza.operators.api.data.Data; +import org.apache.samza.operators.api.data.Schema; + +import java.util.List; +import java.util.Map; + +public class StringData implements Data { + private final Object datum; + private final Schema schema; + + public StringData(Object datum) { + this.datum = datum; + this.schema = new StringSchema(); + } + + @Override + public Schema schema() { + return this.schema; + } + + @Override + public Object value() { + return this.datum; + } + + @Override + public int intValue() { + throw new UnsupportedOperationException("Can't get int value for a string type data"); + } + + @Override + public long longValue() { + throw new UnsupportedOperationException("Can't get long value for a string type data"); + } + + @Override + public float floatValue() { + throw new UnsupportedOperationException("Can't get float value for a string type data"); + } + + @Override + public double doubleValue() { + throw new UnsupportedOperationException("Can't get double value for a string type data"); + } + + @Override + public boolean booleanValue() { + throw new UnsupportedOperationException("Can't get boolean value for a string type data"); + } + + @Override + public String strValue() { + return String.valueOf(datum); + } + + @Override + public byte[] bytesValue() { + throw new UnsupportedOperationException("Can't get bytesValue for a string type data"); + } + + @Override + public List<Object> arrayValue() { + throw new UnsupportedOperationException("Can't get arrayValue for a string type data"); + } + + @Override + public Map<Object, Object> mapValue() { + throw new UnsupportedOperationException("Can't get mapValue for a string type data"); + } + + @Override + public Data getElement(int index) { + throw new UnsupportedOperationException("Can't getElement(index) on a string type data"); + } + + @Override + public Data getFieldData(String fldName) { + throw new UnsupportedOperationException("Can't getFieldData(fieldName) for a string type data"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java new file mode 100644 index 0000000..b19dfeb --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl.data.string; + +import org.apache.samza.operators.api.data.Data; +import org.apache.samza.operators.api.data.Schema; + +import java.util.Map; + +public class StringSchema implements Schema { + private Type type = Type.STRING; + + @Override + public Type getType() { + return Type.STRING; + } + + @Override + public Schema getElementType() { + throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type); + } + + @Override + public Schema getValueType() { + throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type); + } + + @Override + public Map<String, Schema> getFields() { + throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type); + } + + @Override + public Schema getFieldType(String fldName) { + throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type); + } + + @Override + public Data read(Object object) { + return new StringData(object); + } + + @Override + public Data transform(Data inputData) { + if (inputData.schema().getType() != this.type) { + throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type + + ", input type:" + inputData.schema().getType()); + } + return inputData; + } + + @Override + public boolean equals(Schema other) { + return other.getType() == this.type; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java new file mode 100644 index 0000000..2de53aa --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl.window; + +import org.apache.samza.operators.api.MessageStream; +import org.apache.samza.operators.api.WindowState; +import org.apache.samza.operators.api.data.Message; +import org.apache.samza.operators.api.internal.Operators.WindowOperator; +import org.apache.samza.operators.api.internal.WindowOutput; +import org.apache.samza.operators.impl.OperatorImpl; +import org.apache.samza.operators.impl.StateStoreImpl; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; + +import java.util.function.BiFunction; + + +/** + * Default implementation class of a {@link WindowOperator} for a session window. + * + * @param <M> the type of input {@link Message} + * @param <RK> the type of window key + * @param <RM> the type of aggregated value of the window + */ +public class SessionWindowImpl<M extends Message, RK, WS extends WindowState, RM extends WindowOutput<RK, ?>> extends + OperatorImpl<M, RM> { + private final BiFunction<M, Entry<RK, WS>, RM> txfmFunction; + private final StateStoreImpl<M, RK, WS> wndStore; + + SessionWindowImpl(WindowOperator<M, RK, WS, RM> sessWnd, MessageStream<M> input) { + this.txfmFunction = sessWnd.getFunction(); + this.wndStore = new StateStoreImpl<>(sessWnd.getStoreFunctions(), sessWnd.getStoreName(input)); + } + + @Override protected void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + Entry<RK, WS> state = this.wndStore.getState(message); + this.nextProcessors(this.txfmFunction.apply(message, state), collector, coordinator); + this.wndStore.updateState(message, state); + } + + public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + // This is to periodically check the timeout triggers to get the list of window states to be updated + } + + @Override protected void init(TaskContext context) { + this.wndStore.init(context); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java new file mode 100644 index 0000000..e340fe8 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.task; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.api.MessageStream; +import org.apache.samza.operators.api.MessageStreams; +import org.apache.samza.operators.api.MessageStreams.SystemMessageStream; +import org.apache.samza.operators.api.data.IncomingSystemMessage; +import org.apache.samza.operators.impl.ChainedOperators; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.HashMap; +import java.util.Map; + + +/** + * An adaptor task class that invoke the user-implemented (@link StreamOperatorTask}s via {@link MessageStream} programming APIs + * + */ +public final class StreamOperatorAdaptorTask implements StreamTask, InitableTask, WindowableTask { + /** + * A map with entries mapping {@link SystemStreamPartition} to {@link org.apache.samza.operators.impl.ChainedOperators} that takes the {@link SystemStreamPartition} + * as the input stream + */ + private final Map<SystemStreamPartition, ChainedOperators> operatorChains = new HashMap<>(); + + /** + * Wrapped {@link StreamOperatorTask} class + */ + private final StreamOperatorTask userTask; + + /** + * Constructor that wraps the user-defined {@link StreamOperatorTask} + * + * @param userTask the user-defined {@link StreamOperatorTask} + */ + public StreamOperatorAdaptorTask(StreamOperatorTask userTask) { + this.userTask = userTask; + } + + @Override + public final void init(Config config, TaskContext context) throws Exception { + if (this.userTask instanceof InitableTask) { + ((InitableTask) this.userTask).init(config, context); + } + Map<SystemStreamPartition, SystemMessageStream> sources = new HashMap<>(); + context.getSystemStreamPartitions().forEach(ssp -> { + SystemMessageStream ds = MessageStreams.input(ssp); + sources.put(ssp, ds); + }); + this.userTask.initOperators(sources.values()); + sources.forEach((ssp, ds) -> operatorChains.put(ssp, ChainedOperators.create(ds, context))); + } + + @Override + public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) { + this.operatorChains.get(ime.getSystemStreamPartition()).onNext(new IncomingSystemMessage(ime), collector, coordinator); + } + + @Override + public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception{ + this.operatorChains.forEach((ssp, chain) -> chain.onTimer(collector, coordinator)); + if (this.userTask instanceof WindowableTask) { + ((WindowableTask) this.userTask).window(collector, coordinator); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java new file mode 100644 index 0000000..cfdb694 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.task; + +import org.apache.samza.operators.api.MessageStreams.SystemMessageStream; +import java.util.Collection; + +/** + * This interface defines the methods that user needs to implement via the operator programming APIs. + */ +public interface StreamOperatorTask { + + /** + * Defines the method for users to initialize the operator chains consuming from all {@link SystemMessageStream}s. + * Users have to implement this function to instantiate {@link org.apache.samza.operators.impl.ChainedOperators} that + * will process each incoming {@link SystemMessageStream}. + * + * Note that each {@link SystemMessageStream} corresponds to an input {@link org.apache.samza.system.SystemStreamPartition} + * + * @param sources the collection of {@link SystemMessageStream}s that takes {@link org.apache.samza.operators.api.data.IncomingSystemMessage} + * from a {@link org.apache.samza.system.SystemStreamPartition} + */ + void initOperators(Collection<SystemMessageStream> sources); + + +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java new file mode 100644 index 0000000..0f00fdb --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessage.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.api; + +import org.apache.samza.operators.api.data.Message; + + +public class TestMessage implements Message<String, String> { + + private final String key; + private final String value; + private final long timestamp; + + TestMessage(String key, String value, long timestamp) { + this.key = key; + this.value = value; + this.timestamp = timestamp; + } + + @Override public String getMessage() { + return this.value; + } + + @Override public String getKey() { + return this.key; + } + + @Override public long getTimestamp() { + return this.timestamp; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java new file mode 100644 index 0000000..e6aa692 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStreams.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.api; + +import org.apache.samza.Partition; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + + +public class TestMessageStreams { + + @Test public void testInput() { + SystemStreamPartition ssp = new SystemStreamPartition("my-system", "my-stream", new Partition(0)); + MessageStreams.SystemMessageStream mSysStream = MessageStreams.input(ssp); + assertEquals(mSysStream.getSystemStreamPartition(), ssp); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fbdd76da/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java new file mode 100644 index 0000000..8faa92c --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.api; + +import org.junit.Before; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestTriggerBuilder{ + private Field earlyTriggerField; + private Field lateTriggerField; + private Field timerTriggerField; + private Field earlyTriggerUpdater; + private Field lateTriggerUpdater; + + @Before + public void testPrep() throws Exception { + this.earlyTriggerField = TriggerBuilder.class.getDeclaredField("earlyTrigger"); + this.lateTriggerField = TriggerBuilder.class.getDeclaredField("lateTrigger"); + this.timerTriggerField = TriggerBuilder.class.getDeclaredField("timerTrigger"); + this.earlyTriggerUpdater = TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater"); + this.lateTriggerUpdater = TriggerBuilder.class.getDeclaredField("lateTriggerUpdater"); + + this.earlyTriggerField.setAccessible(true); + this.lateTriggerField.setAccessible(true); + this.timerTriggerField.setAccessible(true); + this.earlyTriggerUpdater.setAccessible(true); + this.lateTriggerUpdater.setAccessible(true); + } + + @Test public void testStaticCreators() throws NoSuchFieldException, IllegalAccessException { + TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField = + (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(triggerField.apply(null, mockState)); + when(mockState.getNumberMessages()).thenReturn(2000L); + assertTrue(triggerField.apply(null, mockState)); + + Function<TestMessage, Boolean> tokenFunc = m -> true; + builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc); + triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + TestMessage m = mock(TestMessage.class); + assertTrue(triggerField.apply(m, mockState)); + + builder = TriggerBuilder.earlyTriggerOnEventTime(TestMessage::getTimestamp, 30000L); + triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L); + when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L); + when(m.getTimestamp()).thenReturn(19999000000L); + assertFalse(triggerField.apply(m, mockState)); + when(m.getTimestamp()).thenReturn(32000000000L); + assertTrue(triggerField.apply(m, mockState)); + when(m.getTimestamp()).thenReturn(1001000000L); + when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L); + assertTrue(triggerField.apply(m, mockState)); + + BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> mockFunc = mock(BiFunction.class); + builder = TriggerBuilder.earlyTrigger(mockFunc); + triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + assertEquals(triggerField, mockFunc); + + builder = TriggerBuilder.timeoutSinceFirstMessage(10000L); + Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger = + (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); + when(mockState.getFirstMessageTimeNs()).thenReturn(0L); + assertTrue(timerTrigger.apply(mockState)); + // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion + when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); + assertFalse(timerTrigger.apply(mockState)); + + builder = TriggerBuilder.timeoutSinceLastMessage(10000L); + timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); + when(mockState.getLastMessageTimeNs()).thenReturn(0L); + assertTrue(timerTrigger.apply(mockState)); + // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion + when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000)); + assertFalse(timerTrigger.apply(mockState)); + } + + @Test public void testAddTimerTriggers() throws IllegalAccessException { + TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + builder.addTimeoutSinceFirstMessage(10000L); + // exam that both earlyTrigger and timer triggers are set up + BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField = + (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(triggerField.apply(null, mockState)); + // check the timer trigger + Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger = + (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); + when(mockState.getFirstMessageTimeNs()).thenReturn(0L); + assertTrue(timerTrigger.apply(mockState)); + // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion + when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); + assertFalse(timerTrigger.apply(mockState)); + + // exam that both early trigger and timer triggers are set up + builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(triggerField.apply(null, mockState)); + builder.addTimeoutSinceLastMessage(20000L); + // check the timer trigger + timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); + when(mockState.getLastMessageTimeNs()).thenReturn(0L); + assertTrue(timerTrigger.apply(mockState)); + // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion + when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); + assertFalse(timerTrigger.apply(mockState)); + } + + @Test public void testAddLateTriggers() throws IllegalAccessException { + TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + builder.addLateTriggerOnSizeLimit(10000L); + // exam that both earlyTrigger and lateTriggers are set up + BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> earlyTrigger = + (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(earlyTrigger.apply(null, mockState)); + // check the late trigger + BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> lateTrigger = + (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder); + assertFalse(lateTrigger.apply(null, mockState)); + // set the number of messages to 10001 to trigger the late trigger + when(mockState.getNumberMessages()).thenReturn(10001L); + assertTrue(lateTrigger.apply(null, mockState)); + + builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0); + // exam that both earlyTrigger and lateTriggers are set up + earlyTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); + mockState = mock(WindowState.class); + when(mockState.getNumberMessages()).thenReturn(200L); + assertFalse(earlyTrigger.apply(null, mockState)); + // exam the lateTrigger + when(mockState.getOutputValue()).thenReturn(new ArrayList<>()); + lateTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder); + assertFalse(lateTrigger.apply(null, mockState)); + List<TestMessage> mockList = mock(ArrayList.class); + when(mockList.size()).thenReturn(200); + when(mockState.getOutputValue()).thenReturn(mockList); + assertTrue(lateTrigger.apply(null, mockState)); + } + + @Test public void testAddTriggerUpdater() throws IllegalAccessException { + TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); + builder.onEarlyTrigger(c -> { c.clear(); return c;} ); + List<TestMessage> collection = new ArrayList<TestMessage>() {{ + for(int i = 0; i < 10; i++) { + this.add(new TestMessage(String.format("key-%d", i), "string-value", System.nanoTime())); + } + }}; + // exam that earlyTriggerUpdater is set up + Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> earlyTriggerUpdater = + (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.earlyTriggerUpdater.get(builder); + WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); + when(mockState.getOutputValue()).thenReturn(collection); + earlyTriggerUpdater.apply(mockState); + assertTrue(collection.isEmpty()); + + collection.add(new TestMessage("key-to-stay", "string-to-stay", System.nanoTime())); + collection.add(new TestMessage("key-to-remove", "string-to-remove", System.nanoTime())); + builder.onLateTrigger(c -> { + c.removeIf(t -> t.getKey().equals("key-to-remove")); + return c; + }); + // check the late trigger updater + Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> lateTriggerUpdater = + (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.lateTriggerUpdater.get(builder); + when(mockState.getOutputValue()).thenReturn(collection); + lateTriggerUpdater.apply(mockState); + assertTrue(collection.size() == 1); + assertFalse(collection.get(0).isDelete()); + assertEquals(collection.get(0).getKey(), "key-to-stay"); + } +}
