Repository: apex-malhar Updated Branches: refs/heads/master e44caa5a5 -> 822323d02
APEXMALHAR-2100 Implementation of Inner Join operator Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/fe2da3e9 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/fe2da3e9 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/fe2da3e9 Branch: refs/heads/master Commit: fe2da3e96ade8eaef8bce38237b8147adfffa494 Parents: 255bc11 Author: chaitanya <[email protected]> Authored: Tue Aug 23 14:02:52 2016 +0530 Committer: chaitanya <[email protected]> Committed: Tue Aug 23 14:02:52 2016 +0530 ---------------------------------------------------------------------- .../lib/join/AbstractInnerJoinOperator.java | 340 ++++++++++++++++++ .../AbstractManagedStateInnerJoinOperator.java | 253 +++++++++++++ .../apex/malhar/lib/join/JoinStreamCodec.java | 46 +++ .../malhar/lib/join/POJOInnerJoinOperator.java | 246 +++++++++++++ .../state/managed/AbstractManagedStateImpl.java | 4 +- .../managed/ManagedTimeStateMultiValue.java | 353 +++++++++++++++++++ .../lib/join/POJOInnerJoinOperatorTest.java | 351 ++++++++++++++++++ .../lib/join/POJOPartitionJoinOperatorTest.java | 194 ++++++++++ 8 files changed, 1786 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java new file mode 100644 index 0000000..dd58ea2 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * <p> + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * <b>Properties:</b><br> + * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4<br> + * <b>leftKeyExpression</b>: key field expression for stream1.<br> + * <b>rightKeyExpression</b>: key field expression for stream2.<br> + * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br> + * <b>expiryTime</b>: Expiry time in milliseconds for stored tuples which comes from both streams<br> + * <b>isLeftKeyPrimary</b>: : Specifies whether the left key(Stream1 key) is primary or not<br> + * <b>isRightKeyPrimary</b>: : Specifies whether the right key(stream2 key) is primary or not<br> + * + * <b> Example: </b> <br> + * Left input port receives customer details and right input port receives Order details. + * Schema for the Customer be in the form of {ID, Name, CTime} + * Schema for the Order be in the form of {OID, CID, OTime} + * Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is + * matched tuples must have timestamp within 5 minutes. + * Here, leftKeyExpression = ID, rightKeyExpression = CID and Time Fields = CTime, + * OTime, expiryTime = 5 minutes </b> <br> + * + * @displayName Abstract Inner Join Operator + * @tags join + */ [email protected] +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator +{ + @NotNull + private String leftKeyExpression; + @NotNull + private String rightKeyExpression; + protected transient String[][] includeFields; + protected transient List<String> keyFieldExpressions; + protected transient List<String> timeFields; + @AutoMetric + private long tuplesJoinedPerSec; + private double windowTimeSec; + private int tuplesCount; + @NotNull + private String includeFieldStr; + private String timeFieldsStr; + @NotNull + private Long expiryTime; + private boolean isLeftKeyPrimary = false; + private boolean isRightKeyPrimary = false; + protected SpillableComplexComponent component; + protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data; + protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data; + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert <key,tuple> into the store where store is the stream1Data if the tuple + * receives from stream1 or viceversa. + * 3) Get the values of the key if found it in opposite store + * 4) Merge the given tuple and values found from step (3) + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + */ + protected void processTuple(T tuple, boolean isStream1Data) + { + Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data; + K key = extractKey(tuple,isStream1Data); + if (!store.put(key, tuple)) { + return; + } + Spillable.SpillableByteArrayListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data; + joinStream(tuple,isStream1Data, valuestore.get(key)); + } + + /** + * Merge the given tuple and list of values. + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + * @param value list of tuples + */ + protected void joinStream(T tuple, boolean isStream1Data, List<T> value) + { + // Join the input tuple with the joined tuples + if (value != null) { + for (T joinedValue : value) { + T result = isStream1Data ? mergeTuples(tuple, joinedValue) : + mergeTuples(joinedValue, tuple); + if (result != null) { + tuplesCount++; + emitTuple(result); + } + } + } + } + + @Override + public void setup(Context.OperatorContext context) + { + component = new SpillableComplexComponentImpl(new InMemSpillableStateStore()); + if (stream1Data == null && stream2Data == null) { + createStores(); + } + component.setup(context); + keyFieldExpressions = Arrays.asList(leftKeyExpression,rightKeyExpression); + if (timeFields != null) { + timeFields = Arrays.asList(timeFieldsStr.split(",")); + } + String[] streamFields = includeFieldStr.split(";"); + includeFields = new String[2][]; + for (int i = 0; i < streamFields.length; i++) { + includeFields[i] = streamFields[i].split(","); + } + windowTimeSec = (context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * + context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * 1.0) / 1000.0; + } + + @Override + public void beginWindow(long windowId) + { + component.beginWindow(windowId); + tuplesJoinedPerSec = 0; + tuplesCount = 0; + } + + @Override + public void endWindow() + { + component.endWindow(); + tuplesJoinedPerSec = (long)(tuplesCount / windowTimeSec); + } + + @Override + public void teardown() + { + component.teardown(); + } + + /** + * Extract the key from the given tuple + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + * @return the key + */ + public abstract K extractKey(T tuple, boolean isStream1Data); + + /** + * Extract the time from the given tuple + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + * @return the time + */ + public abstract long extractTime(T tuple, boolean isStream1Data); + + /** + * Merge the given tuples + * @param tuple1 tuple belongs to stream1 + * @param tuple2 tuple belongs to stream1 + * @return the merge tuple + */ + public abstract T mergeTuples(T tuple1, T tuple2); + + /** + * Emit the given tuple + * @param tuple given tuple + */ + public abstract void emitTuple(T tuple); + + /** + * Create stores for both the streams + */ + public void createStores() + { + stream1Data = component.newSpillableByteArrayListMultimap(0,null,null); + stream2Data = component.newSpillableByteArrayListMultimap(0,null,null); + } + + /** + * Get the left key expression + * @return the leftKeyExpression + */ + public String getLeftKeyExpression() + { + return leftKeyExpression; + } + + /** + * Set the left key expression + * @param leftKeyExpression given leftKeyExpression + */ + public void setLeftKeyExpression(String leftKeyExpression) + { + this.leftKeyExpression = leftKeyExpression; + } + + /** + * Get the right key expression + * @return the rightKeyExpression + */ + public String getRightKeyExpression() + { + return rightKeyExpression; + } + + /** + * Set the right key expression + * @param rightKeyExpression given rightKeyExpression + */ + public void setRightKeyExpression(String rightKeyExpression) + { + this.rightKeyExpression = rightKeyExpression; + } + + /** + * Return the include fields of two streams + * @return the includeFieldStr + */ + public String getIncludeFieldStr() + { + return includeFieldStr; + } + + /** + * List of comma separated fields to be added to the output tuple. + * @param includeFieldStr given includeFieldStr + */ + public void setIncludeFieldStr(@NotNull String includeFieldStr) + { + this.includeFieldStr = Preconditions.checkNotNull(includeFieldStr); + } + + /** + * Return the time fields for both the streams + * @return the timeFieldsStr + */ + public String getTimeFieldsStr() + { + return timeFieldsStr; + } + + /** + * Set the time fields as comma separated for both the streams + * @param timeFieldsStr given timeFieldsStr + */ + public void setTimeFieldsStr(String timeFieldsStr) + { + this.timeFieldsStr = timeFieldsStr; + } + + /** + * returns the expiry time + * @return the expiryTime + */ + public Long getExpiryTime() + { + return expiryTime; + } + + /** + * Sets the expiry time + * @return the expiryTime + */ + public void setExpiryTime(@NotNull Long expiryTime) + { + this.expiryTime = Preconditions.checkNotNull(expiryTime); + } + + /** + * return whether the left key is primary or not + * @return the isLeftKeyPrimary + */ + public boolean isLeftKeyPrimary() + { + return isLeftKeyPrimary; + } + + /** + * Set the leftKeyPrimary + * @param leftKeyPrimary given leftKeyPrimary + */ + public void setLeftKeyPrimary(boolean leftKeyPrimary) + { + isLeftKeyPrimary = leftKeyPrimary; + } + + /** + * return whether the right key is primary or not + * @return the isRightKeyPrimary + */ + public boolean isRightKeyPrimary() + { + return isRightKeyPrimary; + } + + /** + * Set the rightKeyPrimary + * @param rightKeyPrimary given rightKeyPrimary + */ + public void setRightKeyPrimary(boolean rightKeyPrimary) + { + isRightKeyPrimary = rightKeyPrimary; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java new file mode 100644 index 0000000..dbf903d --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.join; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.hadoop.fs.Path; +import com.google.common.collect.Maps; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; + +/** + * An abstract implementation of inner join operator over Managed state which extends from + * AbstractInnerJoinOperator. + * + * <b>Properties:</b><br> + * <b>noOfBuckets</b>: Number of buckets required for Managed state. <br> + * <b>bucketSpanTime</b>: Indicates the length of the time bucket. <br> + */ [email protected] +public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends AbstractInnerJoinOperator<K,T> implements + Operator.CheckpointNotificationListener, Operator.IdleTimeHandler +{ + public static final String stateDir = "managedState"; + public static final String stream1State = "stream1Data"; + public static final String stream2State = "stream2Data"; + private transient Map<JoinEvent<K,T>, Future<List>> waitingEvents = Maps.newLinkedHashMap(); + private int noOfBuckets = 1; + private Long bucketSpanTime; + protected ManagedTimeStateImpl stream1Store; + protected ManagedTimeStateImpl stream2Store; + + /** + * Create Managed states and stores for both the streams. + */ + @Override + public void createStores() + { + stream1Store = new ManagedTimeStateImpl(); + stream2Store = new ManagedTimeStateImpl(); + stream1Store.setNumBuckets(noOfBuckets); + stream2Store.setNumBuckets(noOfBuckets); + if (bucketSpanTime != null) { + stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); + stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(bucketSpanTime)); + } + stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); + stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime())); + + stream1Data = new ManagedTimeStateMultiValue(stream1Store, !isLeftKeyPrimary()); + stream2Data = new ManagedTimeStateMultiValue(stream2Store, !isRightKeyPrimary()); + } + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert <key,tuple> into the store where store is the stream1Data if the tuple + * receives from stream1 or viceversa. + * 3) Get the values of the key in asynchronous if found it in opposite store + * 4) If the future is done then Merge the given tuple and values found from step (3) otherwise + * put it in waitingEvents + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + */ + @Override + protected void processTuple(T tuple, boolean isStream1Data) + { + Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data; + K key = extractKey(tuple,isStream1Data); + long timeBucket = extractTime(tuple,isStream1Data); + if (!((ManagedTimeStateMultiValue)store).put(key, tuple,timeBucket)) { + return; + } + Spillable.SpillableByteArrayListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data; + Future<List> future = ((ManagedTimeStateMultiValue)valuestore).getAsync(key); + if (future.isDone()) { + try { + joinStream(tuple,isStream1Data, future.get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } else { + waitingEvents.put(new JoinEvent<>(key,tuple,isStream1Data),future); + } + } + + @Override + public void handleIdleTime() + { + if (waitingEvents.size() > 0) { + processWaitEvents(false); + } + } + + @Override + public void beforeCheckpoint(long l) + { + stream1Store.beforeCheckpoint(l); + stream2Store.beforeCheckpoint(l); + } + + @Override + public void checkpointed(long l) + { + stream1Store.checkpointed(l); + stream2Store.checkpointed(l); + } + + @Override + public void committed(long l) + { + stream1Store.committed(l); + stream2Store.committed(l); + } + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + ((FileAccessFSImpl)stream1Store.getFileAccess()).setBasePath(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + stateDir + Path.SEPARATOR + String.valueOf(context.getId()) + Path.SEPARATOR + stream1State); + ((FileAccessFSImpl)stream2Store.getFileAccess()).setBasePath(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + stateDir + Path.SEPARATOR + String.valueOf(context.getId()) + Path.SEPARATOR + stream2State); + stream1Store.getCheckpointManager().setRecoveryPath("managed_state_" + stream1State); + stream1Store.getCheckpointManager().setRecoveryPath("managed_state_" + stream2State); + stream1Store.setup(context); + stream2Store.setup(context); + } + + @Override + public void beginWindow(long windowId) + { + stream1Store.beginWindow(windowId); + stream2Store.beginWindow(windowId); + super.beginWindow(windowId); + } + + /** + * Process the waiting events + * @param finalize finalize Whether or not to wait for future to return + */ + private void processWaitEvents(boolean finalize) + { + Iterator<Map.Entry<JoinEvent<K,T>, Future<List>>> waitIterator = waitingEvents.entrySet().iterator(); + while (waitIterator.hasNext()) { + Map.Entry<JoinEvent<K,T>, Future<List>> waitingEvent = waitIterator.next(); + Future<List> future = waitingEvent.getValue(); + if (future.isDone() || finalize) { + try { + JoinEvent<K,T> event = waitingEvent.getKey(); + joinStream(event.value,event.isStream1Data,future.get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("end window", e); + } + waitIterator.remove(); + if (!finalize) { + break; + } + } + } + } + + @Override + public void endWindow() + { + processWaitEvents(true); + stream1Store.endWindow(); + stream2Store.endWindow(); + super.endWindow(); + } + + @Override + public void teardown() + { + stream1Store.teardown(); + stream2Store.teardown(); + super.teardown(); + } + + /** + * Return the number of buckets + * @return the noOfBuckets + */ + public int getNoOfBuckets() + { + return noOfBuckets; + } + + /** + * Set the number of buckets required for managed state + * @param noOfBuckets noOfBuckets + */ + public void setNoOfBuckets(int noOfBuckets) + { + this.noOfBuckets = noOfBuckets; + } + + /** + * Return the bucketSpanTime + * @return the bucketSpanTime + */ + public Long getBucketSpanTime() + { + return bucketSpanTime; + } + + /** + * Sets the length of the time bucket required for managed state. + * @param bucketSpanTime given bucketSpanTime + */ + public void setBucketSpanTime(Long bucketSpanTime) + { + this.bucketSpanTime = bucketSpanTime; + } + + public static class JoinEvent<K,T> + { + public K key; + public T value; + public boolean isStream1Data; + + public JoinEvent(K key, T value, boolean isStream1Data) + { + this.key = key; + this.value = value; + this.isStream1Data = isStream1Data; + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/join/JoinStreamCodec.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/JoinStreamCodec.java b/library/src/main/java/org/apache/apex/malhar/lib/join/JoinStreamCodec.java new file mode 100644 index 0000000..7a34699 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/join/JoinStreamCodec.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.join; + +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Stream codec based on keyExpression for POJO Inner Join Operator. + */ [email protected] +public class JoinStreamCodec extends KryoSerializableStreamCodec<Object> +{ + private transient PojoUtils.Getter<Object, Object> getter; + private String keyExpression; + + public JoinStreamCodec(String keyExpression) + { + this.keyExpression = keyExpression; + } + + @Override + public int getPartition(Object o) + { + if (getter == null) { + getter = PojoUtils.createGetter(o.getClass(), keyExpression, Object.class); + } + return getter.get(o).hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperator.java new file mode 100644 index 0000000..0b23808 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperator.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ [email protected] +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator<Object,Object> implements Operator.ActivationListener<Context> +{ + private transient long timeIncrement; + private transient FieldObjectMap[] inputFieldObjects = (FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2); + protected transient Class<?> outputClass; + private long time = System.currentTimeMillis(); + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>() + { + @Override + public void setup(Context.PortContext context) + { + outputClass = context.getValue(Context.PortContext.TUPLE_CLASS); + } + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort<Object> input1 = new DefaultInputPort<Object>() + { + @Override + public void setup(Context.PortContext context) + { + inputFieldObjects[0].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); + } + + @Override + public void process(Object tuple) + { + processTuple(tuple,true); + } + + @Override + public StreamCodec<Object> getStreamCodec() + { + return getInnerJoinStreamCodec(true); + } + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort<Object> input2 = new DefaultInputPort<Object>() + { + @Override + public void setup(Context.PortContext context) + { + inputFieldObjects[1].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); + } + + @Override + public void process(Object tuple) + { + processTuple(tuple,false); + } + + @Override + public StreamCodec<Object> getStreamCodec() + { + return getInnerJoinStreamCodec(false); + } + }; + + @Override + public void setup(Context.OperatorContext context) + { + timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * + context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); + super.setup(context); + for (int i = 0; i < 2; i++) { + inputFieldObjects[i] = new FieldObjectMap(); + } + } + + /** + * Extract the time value from the given tuple + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + * @return the time in milliseconds + */ + @Override + public long extractTime(Object tuple, boolean isStream1Data) + { + return timeFields == null ? time : (long)(isStream1Data ? inputFieldObjects[0].timeFieldGet.get(tuple) : + inputFieldObjects[1].timeFieldGet.get(tuple)); + } + + /** + * Create getters for the key and time fields and setters for the include fields. + */ + private void generateSettersAndGetters() + { + for (int i = 0; i < 2; i++) { + Class inputClass = inputFieldObjects[i].inputClass; + try { + inputFieldObjects[i].keyGet = PojoUtils.createGetter(inputClass, keyFieldExpressions.get(i), Object.class); + if (timeFields != null && timeFields.size() == 2) { + Class timeField = ClassUtils.primitiveToWrapper(inputClass.getField(timeFields.get(i)).getType()); + inputFieldObjects[i].timeFieldGet = PojoUtils.createGetter(inputClass, timeFields.get(i), timeField); + } + for (int j = 0; j < includeFields[i].length; j++) { + Class inputField = ClassUtils.primitiveToWrapper(inputClass.getField(includeFields[i][j]).getType()); + Class outputField = ClassUtils.primitiveToWrapper(outputClass.getField(includeFields[i][j]).getType()); + if (inputField != outputField) { + continue; + } + inputFieldObjects[i].fieldMap.put(PojoUtils.createGetter(inputClass, includeFields[i][j], inputField), + PojoUtils.createSetter(outputClass, includeFields[i][j], outputField)); + } + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Extract the key value from the given tuple + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + * @return the key object + */ + @Override + public Object extractKey(Object tuple, boolean isStream1Data) + { + return isStream1Data ? inputFieldObjects[0].keyGet.get(tuple) : + inputFieldObjects[1].keyGet.get(tuple); + } + + /** + * Merge the given tuples + * @param tuple1 tuple belongs to stream1 + * @param tuple2 tuple belongs to stream1 + * @return the merged output object + */ + @Override + public Object mergeTuples(Object tuple1, Object tuple2) + { + Object o; + try { + o = outputClass.newInstance(); + for (Map.Entry<PojoUtils.Getter,PojoUtils.Setter> g: inputFieldObjects[0].fieldMap.entrySet()) { + g.getValue().set(o, g.getKey().get(tuple1)); + } + for (Map.Entry<PojoUtils.Getter,PojoUtils.Setter> g: inputFieldObjects[1].fieldMap.entrySet()) { + g.getValue().set(o, g.getKey().get(tuple2)); + } + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException(e); + } + return o; + } + + /** + * Emit the given tuple through the outputPort + * @param tuple given tuple + */ + @Override + public void emitTuple(Object tuple) + { + outputPort.emit(tuple); + } + + @Override + public void activate(Context context) + { + generateSettersAndGetters(); + } + + @Override + public void deactivate() + { + } + + @Override + public void endWindow() + { + super.endWindow(); + time += timeIncrement; + } + + /** + * Returns the streamcodec for the streams + * @param isStream1data Specifies whether the codec needs for stream1 or stream2. + * @return the object of JoinStreamCodec + */ + private StreamCodec<Object> getInnerJoinStreamCodec(boolean isStream1data) + { + if (isStream1data) { + return new JoinStreamCodec(getLeftKeyExpression()); + } + return new JoinStreamCodec(getRightKeyExpression()); + } + + private class FieldObjectMap + { + public Class<?> inputClass; + public PojoUtils.Getter keyGet; + public PojoUtils.Getter timeFieldGet; + public Map<PojoUtils.Getter,PojoUtils.Setter> fieldMap; + + public FieldObjectMap() + { + fieldMap = new HashMap<>(); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java index 927a6df..25b3f8b 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java @@ -172,6 +172,8 @@ public abstract class AbstractManagedStateImpl protected final transient ListMultimap<Long, ValueFetchTask> tasksPerBucketId = Multimaps.synchronizedListMultimap(ArrayListMultimap.<Long, ValueFetchTask>create()); + protected Bucket.ReadSource asyncReadSource = Bucket.ReadSource.ALL; + @Override public void setup(OperatorContext context) { @@ -569,7 +571,7 @@ public abstract class AbstractManagedStateImpl synchronized (bucket) { //a particular bucket should only be handled by one thread at any point of time. Handling of bucket here //involves creating readers for the time buckets and de-serializing key/value from a reader. - Slice value = bucket.get(key, timeBucketId, Bucket.ReadSource.ALL); + Slice value = bucket.get(key, timeBucketId, this.managedState.asyncReadSource); managedState.tasksPerBucketId.remove(bucket.getBucketId(), this); return value; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java new file mode 100644 index 0000000..fd7250d --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.StreamCodec; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.netlet.util.Slice; + +/** + * Concrete implementation of SpillableByteArrayListMultimap which is needed for join operator. + * + * <b>Properties:</b><br> + * <b>isKeyContainsMultiValue</b>: Specifies whether the key has multiple value or not. <br> + * <b>timeBucket</b>: Specifies the lenght of the time bucket. + * + */ [email protected] +public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableByteArrayListMultimap<K,V> +{ + private transient StreamCodec streamCodec = null; + private boolean isKeyContainsMultiValue = false; + private long timeBucket; + @NotNull + private ManagedTimeStateImpl store; + + public ManagedTimeStateMultiValue() + { + if (streamCodec == null) { + streamCodec = new KryoSerializableStreamCodec(); + } + } + + public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, boolean isKeyContainsMultiValue) + { + this(); + this.store = Preconditions.checkNotNull(store); + this.store.asyncReadSource = Bucket.ReadSource.READERS; + this.isKeyContainsMultiValue = isKeyContainsMultiValue; + } + + /** + * Return the list of values from the store + * @param k given key + * @return list of values + */ + @Override + public List<V> get(@Nullable K k) + { + List<V> value = null; + Slice valueSlice = store.getSync(getBucketId(k), streamCodec.toByteArray(k)); + if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer == null) { + return null; + } + if (isKeyContainsMultiValue) { + return (List<V>)streamCodec.fromByteArray(valueSlice); + } + value = new ArrayList<>(); + value.add((V)streamCodec.fromByteArray(valueSlice)); + return value; + } + + /** + * Returns the Future form the store. + * @param k given key + * @return + */ + public CompositeFuture getAsync(@Nullable K k) + { + return new CompositeFuture(store.getAsync(getBucketId(k), streamCodec.toByteArray(k))); + } + + @Override + public Set<K> keySet() + { + throw new UnsupportedOperationException(); + } + + @Override + public Multiset<K> keys() + { + throw new UnsupportedOperationException(); + } + + @Override + public Collection<V> values() + { + throw new UnsupportedOperationException(); + } + + @Override + public Collection<Map.Entry<K, V>> entries() + { + throw new UnsupportedOperationException(); + } + + @Override + public List<V> removeAll(@Nullable Object o) + { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() + { + + } + + @Override + public int size() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isEmpty() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsKey(@Nullable Object o) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsValue(@Nullable Object o) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsEntry(@Nullable Object o, @Nullable Object o1) + { + throw new UnsupportedOperationException(); + } + + /** + * Inserts the (k,v) into the store. + * @param k key + * @param v value + * @return true if the given (k,v) is successfully inserted into the store otherwise false. + */ + @Override + public boolean put(@Nullable K k, @Nullable V v) + { + if (isKeyContainsMultiValue) { + Slice keySlice = streamCodec.toByteArray(k); + int bucketId = getBucketId(k); + Slice valueSlice = store.getSync(bucketId, keySlice); + List<V> listOb; + if (valueSlice == null || valueSlice.length == 0) { + listOb = new ArrayList<>(); + } else { + listOb = (List<V>)streamCodec.fromByteArray(valueSlice); + } + listOb.add(v); + return insertInStore(bucketId, timeBucket, keySlice, streamCodec.toByteArray(listOb)); + } + return insertInStore(getBucketId(k), timeBucket, streamCodec.toByteArray(k),streamCodec.toByteArray(v)); + } + + /** + * Inserts the (k,v) into the store using the specified timebucket. + * @param k key + * @param v value + * @param timeBucket timebucket + * @return true if the given (k,v) is successfully inserted into the store otherwise false. + */ + public boolean put(@Nullable K k, @Nullable V v, long timeBucket) + { + if (isKeyContainsMultiValue) { + Slice keySlice = streamCodec.toByteArray(k); + int bucketId = getBucketId(k); + Slice valueSlice = store.getSync(bucketId, keySlice); + List<V> listOb; + if (valueSlice == null || valueSlice.length == 0) { + listOb = new ArrayList<>(); + } else { + listOb = (List<V>)streamCodec.fromByteArray(valueSlice); + } + listOb.add(v); + return insertInStore(bucketId, timeBucket, keySlice, streamCodec.toByteArray(listOb)); + } + return insertInStore(getBucketId(k), timeBucket, streamCodec.toByteArray(k),streamCodec.toByteArray(v)); + } + + /** + * Insert (keySlice,valueSlice) into the store using bucketId and timeBucket. + * @param bucketId bucket Id + * @param timeBucket time bucket + * @param keySlice key slice + * @param valueSlice value slice + * @return true if the given (keySlice,valueSlice) is successfully inserted into the + * store otherwise false. + */ + private boolean insertInStore(long bucketId, long timeBucket, Slice keySlice, Slice valueSlice) + { + long timeBucketId = store.getTimeBucketAssigner().getTimeBucketAndAdjustBoundaries(timeBucket); + if (timeBucketId != -1) { + store.putInBucket(bucketId, timeBucketId, keySlice, valueSlice); + return true; + } + return false; + } + + @Override + public boolean remove(@Nullable Object o, @Nullable Object o1) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean putAll(@Nullable K k, Iterable<? extends V> iterable) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean putAll(Multimap<? extends K, ? extends V> multimap) + { + throw new UnsupportedOperationException(); + } + + @Override + public List<V> replaceValues(K k, Iterable<? extends V> iterable) + { + throw new UnsupportedOperationException(); + } + + @Override + public Map<K, Collection<V>> asMap() + { + throw new UnsupportedOperationException(); + } + + public int getBucketId(K k) + { + return k.hashCode() % store.getNumBuckets(); + } + + public long getTimeBucket() + { + return timeBucket; + } + + public void setTimeBucket(long timeBucket) + { + this.timeBucket = timeBucket; + } + + public StreamCodec getStreamCodec() + { + return streamCodec; + } + + public void setStreamCodec(StreamCodec streamCodec) + { + this.streamCodec = streamCodec; + } + + public class CompositeFuture implements Future<List> + { + public Future<Slice> slice; + + public CompositeFuture(Future<Slice> slice) + { + this.slice = slice; + } + + @Override + public boolean cancel(boolean b) + { + return slice.cancel(b); + } + + @Override + public boolean isCancelled() + { + return slice.isCancelled(); + } + + @Override + public boolean isDone() + { + return slice.isDone(); + } + + /** + * Converts the single element into the list. + * @return the list of values + * @throws InterruptedException + * @throws ExecutionException + */ + @Override + public List get() throws InterruptedException, ExecutionException + { + List<V> value = null; + Slice valueSlice = slice.get(); + if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer == null) { + return null; + } + if (isKeyContainsMultiValue) { + value = (List<V>)streamCodec.fromByteArray(valueSlice); + } else { + value = new ArrayList<>(); + value.add((V)streamCodec.fromByteArray(valueSlice)); + } + return value; + } + + @Override + public List get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException + { + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/test/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperatorTest.java new file mode 100644 index 0000000..0b31c32 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/join/POJOInnerJoinOperatorTest.java @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.join; + +import java.io.IOException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; +import com.datatorrent.stram.engine.PortContext; + +public class POJOInnerJoinOperatorTest +{ + @Rule + public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo(); + private static final String APPLICATION_PATH_PREFIX = "target/InnerJoinPOJOTest/"; + private String applicationPath; + private Attribute.AttributeMap.DefaultAttributeMap attributes; + Context.OperatorContext context; + + public static class Customer + { + public int ID; + public String Name; + public Customer() + { + } + + public Customer(int ID, String name) + { + this.ID = ID; + Name = name; + } + + @Override + public String toString() + { + return "Customer{" + + "ID=" + ID + + ", Name='" + Name + '\'' + + '}'; + } + } + + public static class Order + { + public int OID; + public int CID; + public int Amount; + + public Order() + { + } + + public Order(int OID, int CID, int amount) + { + this.OID = OID; + this.CID = CID; + Amount = amount; + } + + @Override + public String toString() + { + return "Order{" + + "OID=" + OID + + ", CID=" + CID + + ", Amount=" + Amount + + '}'; + } + } + + public static class CustOrder + { + public int ID; + public String Name; + public int OID; + public int Amount; + + public CustOrder() + { + } + + @Override + public String toString() + { + return "{" + + "ID=" + ID + + ", Name='" + Name + '\'' + + ", OID=" + OID + + ", Amount=" + Amount + + '}'; + } + } + + @Before + public void beforeTest() + { + applicationPath = OperatorContextTestHelper.getUniqueApplicationPath(APPLICATION_PATH_PREFIX); + attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, applicationPath); + context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes); + } + + @After + public void afterTest() + { + Path root = new Path(applicationPath); + try { + FileSystem fs = FileSystem.newInstance(root.toUri(), new Configuration()); + fs.delete(root, true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testInnerJoinOperator() throws IOException, InterruptedException + { + POJOInnerJoinOperator oper = new POJOInnerJoinOperator(); + oper.setIncludeFieldStr("ID,Name;OID,Amount"); + oper.setLeftKeyExpression("ID"); + oper.setRightKeyExpression("CID"); + oper.setExpiryTime(10000L); + + oper.setup(context); + attributes.put(DAG.InputPortMeta.TUPLE_CLASS, CustOrder.class); + oper.outputPort.setup(new PortContext(attributes,context)); + + attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Customer.class); + oper.input1.setup(new PortContext(attributes,context)); + attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Order.class); + oper.input2.setup(new PortContext(attributes,context)); + oper.activate(context); + + CollectorTestSink<CustOrder> sink = new CollectorTestSink<>(); + @SuppressWarnings({"unchecked", "rawtypes"}) + CollectorTestSink<Object> tmp = (CollectorTestSink)sink; + oper.outputPort.setSink(tmp); + oper.beginWindow(0); + + Customer tuple = new Customer(1, "Anil"); + + oper.input1.process(tuple); + Order order = new Order(102, 1, 300); + + oper.input2.process(order); + Order order2 = new Order(103, 3, 300); + oper.input2.process(order2); + Order order3 = new Order(104, 7, 300); + oper.input2.process(order3); + + oper.endWindow(); + + /* Number of tuple, emitted */ + Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size()); + CustOrder emitted = sink.collectedTuples.iterator().next(); + + Assert.assertEquals("value of ID :", tuple.ID, emitted.ID); + Assert.assertEquals("value of Name :", tuple.Name, emitted.Name); + + Assert.assertEquals("value of OID: ", order.OID, emitted.OID); + Assert.assertEquals("value of Amount: ", order.Amount, emitted.Amount); + + oper.teardown(); + } + + @Test + public void testMultipleValues() throws IOException, InterruptedException + { + POJOInnerJoinOperator oper = new POJOInnerJoinOperator(); + oper.setIncludeFieldStr("ID,Name;OID,Amount"); + oper.setLeftKeyExpression("ID"); + oper.setRightKeyExpression("CID"); + oper.setExpiryTime(10000L); + + oper.setup(context); + attributes.put(DAG.InputPortMeta.TUPLE_CLASS, CustOrder.class); + oper.outputPort.setup(new PortContext(attributes,context)); + + attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Customer.class); + oper.input1.setup(new PortContext(attributes,context)); + attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Order.class); + oper.input2.setup(new PortContext(attributes,context)); + oper.activate(context); + + CollectorTestSink<CustOrder> sink = new CollectorTestSink<>(); + @SuppressWarnings({"unchecked", "rawtypes"}) + CollectorTestSink<Object> tmp = (CollectorTestSink)sink; + oper.outputPort.setSink(tmp); + + oper.beginWindow(0); + Order order = new Order(102, 1, 300); + oper.input2.process(order); + + Order order2 = new Order(103, 3, 300); + oper.input2.process(order2); + oper.endWindow(); + oper.beginWindow(1); + + Order order3 = new Order(104, 1, 300); + oper.input2.process(order3); + Customer tuple = new Customer(1, "Anil"); + oper.input1.process(tuple); + oper.endWindow(); + + /* Number of tuple, emitted */ + Assert.assertEquals("Number of tuple emitted ", 2, sink.collectedTuples.size()); + CustOrder emitted = sink.collectedTuples.get(0); + + Assert.assertEquals("value of ID :", tuple.ID, emitted.ID); + Assert.assertEquals("value of Name :", tuple.Name, emitted.Name); + Assert.assertEquals("value of OID: ", order.OID, emitted.OID); + Assert.assertEquals("value of Amount: ", order.Amount, emitted.Amount); + + emitted = sink.collectedTuples.get(1); + Assert.assertEquals("value of ID :", tuple.ID, emitted.ID); + Assert.assertEquals("value of Name :", tuple.Name, emitted.Name); + Assert.assertEquals("value of OID: ", order3.OID, emitted.OID); + Assert.assertEquals("value of Amount: ", order3.Amount, emitted.Amount); + oper.teardown(); + } + + @Test + public void testUpdateStream1Values() throws IOException, InterruptedException + { + POJOInnerJoinOperator oper = new POJOInnerJoinOperator(); + oper.setIncludeFieldStr("ID,Name;OID,Amount"); + oper.setLeftKeyExpression("ID"); + oper.setRightKeyExpression("CID"); + oper.setLeftKeyPrimary(true); + oper.setExpiryTime(10000L); + + oper.setup(context); + attributes.put(DAG.InputPortMeta.TUPLE_CLASS, CustOrder.class); + oper.outputPort.setup(new PortContext(attributes,context)); + + attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Customer.class); + oper.input1.setup(new PortContext(attributes,context)); + attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Order.class); + oper.input2.setup(new PortContext(attributes,context)); + oper.activate(context); + + CollectorTestSink<CustOrder> sink = new CollectorTestSink<>(); + @SuppressWarnings({"unchecked", "rawtypes"}) + CollectorTestSink<Object> tmp = (CollectorTestSink)sink; + oper.outputPort.setSink(tmp); + + oper.beginWindow(0); + Customer tuple1 = new Customer(1, "Anil"); + oper.input1.process(tuple1); + oper.endWindow(); + oper.beginWindow(1); + + Customer tuple2 = new Customer(1, "Join"); + oper.input1.process(tuple2); + Order order = new Order(102, 1, 300); + oper.input2.process(order); + Order order2 = new Order(103, 3, 300); + oper.input2.process(order2); + oper.endWindow(); + + /* Number of tuple, emitted */ + Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size()); + CustOrder emitted = sink.collectedTuples.get(0); + + Assert.assertEquals("value of ID :", tuple2.ID, emitted.ID); + Assert.assertEquals("value of Name :", tuple2.Name, emitted.Name); + Assert.assertEquals("value of OID: ", order.OID, emitted.OID); + Assert.assertEquals("value of Amount: ", order.Amount, emitted.Amount); + oper.teardown(); + } + + @Test + public void testEmitMultipleTuplesFromStream2() throws IOException, InterruptedException + { + POJOInnerJoinOperator oper = new POJOInnerJoinOperator(); + oper.setIncludeFieldStr("ID,Name;OID,Amount"); + oper.setLeftKeyExpression("ID"); + oper.setRightKeyExpression("CID"); + oper.setLeftKeyPrimary(true); + oper.setExpiryTime(10000L); + + oper.setup(context); + attributes.put(DAG.InputPortMeta.TUPLE_CLASS, CustOrder.class); + oper.outputPort.setup(new PortContext(attributes,context)); + + attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Customer.class); + oper.input1.setup(new PortContext(attributes,context)); + attributes.put(DAG.InputPortMeta.TUPLE_CLASS, Order.class); + oper.input2.setup(new PortContext(attributes,context)); + oper.activate(context); + + CollectorTestSink<CustOrder> sink = new CollectorTestSink<>(); + @SuppressWarnings({"unchecked", "rawtypes"}) + CollectorTestSink<Object> tmp = (CollectorTestSink)sink; + oper.outputPort.setSink(tmp); + + oper.beginWindow(0); + Customer tuple1 = new Customer(1, "Anil"); + oper.input1.process(tuple1); + Order order = new Order(102, 1, 300); + oper.input2.process(order); + Order order2 = new Order(103, 1, 300); + oper.input2.process(order2); + oper.endWindow(); + + /* Number of tuple, emitted */ + Assert.assertEquals("Number of tuple emitted ", 2, sink.collectedTuples.size()); + CustOrder emitted = sink.collectedTuples.get(0); + + Assert.assertEquals("value of ID :", tuple1.ID, emitted.ID); + Assert.assertEquals("value of Name :", tuple1.Name, emitted.Name); + Assert.assertEquals("value of OID: ", order.OID, emitted.OID); + Assert.assertEquals("value of Amount: ", order.Amount, emitted.Amount); + emitted = sink.collectedTuples.get(1); + Assert.assertEquals("value of ID :", tuple1.ID, emitted.ID); + Assert.assertEquals("value of Name :", tuple1.Name, emitted.Name); + Assert.assertEquals("value of OID: ", order2.OID, emitted.OID); + Assert.assertEquals("value of Amount: ", order2.Amount, emitted.Amount); + oper.teardown(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fe2da3e9/library/src/test/java/org/apache/apex/malhar/lib/join/POJOPartitionJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/join/POJOPartitionJoinOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/join/POJOPartitionJoinOperatorTest.java new file mode 100644 index 0000000..9f6161b --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/join/POJOPartitionJoinOperatorTest.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.join; + +import java.util.Date; +import java.util.HashMap; +import java.util.Random; +import java.util.concurrent.CountDownLatch; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.Stats; +import com.datatorrent.api.StatsListener; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +public class POJOPartitionJoinOperatorTest +{ + public static final int NUM_OF_PARTITIONS = 4; + public static final int TOTAL_TUPLES_PROCESS = 1000; + private static boolean testFailed = false; + + public static class PartitionTestJoinOperator extends POJOInnerJoinOperator implements StatsListener + { + public int operatorId; + HashMap<Integer, Integer> partitionMap = Maps.newHashMap(); + transient CountDownLatch latch = new CountDownLatch(1); + int tuplesProcessed = 0; + @AutoMetric + int tuplesProcessedCompletely = 0; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + operatorId = context.getId(); + } + + @Override + protected void processTuple(Object tuple, boolean isStream1Data) + { + // Verifying the data for stream1 + if (!isStream1Data) { + return; + } + int key = (int)extractKey(tuple, isStream1Data); + if (partitionMap.containsKey(key)) { + if (partitionMap.get(key) != operatorId) { + testFailed = true; + } + } else { + partitionMap.put(key, operatorId); + } + tuplesProcessed++; + } + + @Override + public void endWindow() + { + super.endWindow(); + tuplesProcessedCompletely = tuplesProcessed; + } + + @Override + public StatsListener.Response processStats(StatsListener.BatchedOperatorStats stats) + { + Stats.OperatorStats operatorStats = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1); + tuplesProcessedCompletely = (Integer)operatorStats.metrics.get("tuplesProcessedCompletely"); + if (tuplesProcessedCompletely >= TOTAL_TUPLES_PROCESS) { + latch.countDown(); + } + return null; + } + } + + public static class TestGenerator extends BaseOperator implements InputOperator + { + public final transient DefaultOutputPort<TestEvent> output = new DefaultOutputPort<>(); + private final transient Random r = new Random(); + + @Override + public void emitTuples() + { + TestEvent event = new TestEvent(); + event.id = r.nextInt(100); + output.emit(event); + } + } + + public static class TestEvent + { + public int id; + public Date eventTime; + + public TestEvent() + { + } + + public int getId() + { + return id; + } + + public void setId(int id) + { + this.id = id; + } + + public Date getEventTime() + { + return eventTime; + } + + public void setEventTime(Date eventTime) + { + this.eventTime = eventTime; + } + } + + public static class JoinApp implements StreamingApplication + { + public PartitionTestJoinOperator joinOp; + + @Override + public void populateDAG(DAG dag, Configuration configuration) + { + TestGenerator gen1 = dag.addOperator("Generator1", new TestGenerator()); + TestGenerator gen2 = dag.addOperator("Generator2", new TestGenerator()); + + joinOp = dag.addOperator("Join", new PartitionTestJoinOperator()); + joinOp.setLeftKeyExpression("id"); + joinOp.setRightKeyExpression("id"); + joinOp.setIncludeFieldStr("id,eventTime;id,eventTime"); + joinOp.setExpiryTime(10000L); + + ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator()); + + dag.addStream("Gen1ToJoin", gen1.output, joinOp.input1); + dag.addStream("Gen2ToJoin", gen2.output, joinOp.input2); + dag.addStream("JoinToConsole", joinOp.outputPort, console.input); + dag.setInputPortAttribute(joinOp.input1, DAG.InputPortMeta.TUPLE_CLASS,TestEvent.class); + dag.setInputPortAttribute(joinOp.input2, DAG.InputPortMeta.TUPLE_CLASS,TestEvent.class); + dag.setOutputPortAttribute(joinOp.outputPort, DAG.InputPortMeta.TUPLE_CLASS,TestEvent.class); + dag.setAttribute(joinOp, Context.OperatorContext.PARTITIONER, + new StatelessPartitioner<PartitionTestJoinOperator>(NUM_OF_PARTITIONS)); + } + } + + /** + * This test validates whether a tuple key goes to exactly one partition + */ + @Test + public void testJoinOpStreamCodec() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + JoinApp app = new JoinApp(); + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + app.joinOp.latch.await(); + Assert.assertFalse(testFailed); + } + +}
