APEX-2 #resolve Support for persist operation for Stream Meta object. Entire
stream or specific sinks can be persisted with this change.
Stream persistance flow captures union of data flowing to all connected
sinks
Added unit tests for the same
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit:
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f68d249c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f68d249c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f68d249c
Branch: refs/heads/devel-3
Commit: f68d249c35db19c92dd32b641662d97d6ef8ff35
Parents: 9116c70
Author: ishark <[email protected]>
Authored: Wed Jul 22 18:09:41 2015 -0700
Committer: ishark <[email protected]>
Committed: Mon Aug 24 16:55:54 2015 -0700
----------------------------------------------------------------------
api/src/main/java/com/datatorrent/api/DAG.java | 32 +
.../stram/engine/StreamingContainer.java | 17 +-
.../plan/logical/DefaultKryoStreamCodec.java | 87 ++
.../stram/plan/logical/LogicalPlan.java | 200 +++-
.../StreamCodecWrapperForPersistance.java | 89 ++
.../stram/plan/physical/PhysicalPlan.java | 110 ++-
.../stram/stream/BufferServerSubscriber.java | 68 +-
.../stram/stream/PartitionAwareSink.java | 8 +-
.../PartitionAwareSinkForPersistence.java | 45 +
.../stram/plan/StreamPersistanceTests.java | 988 +++++++++++++++++++
10 files changed, 1626 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f68d249c/api/src/main/java/com/datatorrent/api/DAG.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/DAG.java
b/api/src/main/java/com/datatorrent/api/DAG.java
index c452ac6..9c6c492 100644
--- a/api/src/main/java/com/datatorrent/api/DAG.java
+++ b/api/src/main/java/com/datatorrent/api/DAG.java
@@ -106,6 +106,38 @@ public interface DAG extends DAGContext, Serializable
public StreamMeta addSink(Operator.InputPort<?> port);
+ /**
+ * Persist entire stream using operator passed.
+ *
+ * @param Persist Operator name
+ * @param Operator to use for persisting
+ * @param Input port to use for persisting
+ * @return Object that describes the meta for the stream.
+ */
+ public StreamMeta persistUsing(String name, Operator persistOperator,
Operator.InputPort<?> persistOperatorInputPort);
+
+ /**
+ * Set locality for the stream. The setting is best-effort, engine can
+ * override due to other settings or constraints.
+ *
+ * @param Persist Operator name
+ * @param Operator to use for persisting
+ * @return Object that describes the meta for the stream.
+ */
+ public StreamMeta persistUsing(String name, Operator persistOperator);
+
+ /**
+ * Set locality for the stream. The setting is best-effort, engine can
+ * override due to other settings or constraints.
+ *
+ * @param Persist Operator name
+ * @param Operator to use for persisting
+ * @param Input port to use for persisting
+ * @param Sink to persist
+ * @return Object that describes the meta for the stream.
+ */
+ public StreamMeta persistUsing(String name, Operator persistOperator,
Operator.InputPort<?> persistOperatorInputPort, Operator.InputPort<?>
sinkToPersist);
+
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f68d249c/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git
a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 2718439..2705093 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -76,6 +76,7 @@ import com.datatorrent.stram.debug.StdOutErrLog;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.Operators.PortContextPair;
import com.datatorrent.stram.plan.logical.Operators.PortMappingDescriptor;
+import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
import com.datatorrent.stram.security.StramUserLogin;
import com.datatorrent.stram.stream.*;
@@ -1115,6 +1116,9 @@ public class StreamingContainer extends YarnContainerMain
BufferServerSubscriber subscriber = fastPublisherSubscriber
? new
FastSubscriber("tcp://".concat(nidi.bufferServerHost).concat(":").concat(String.valueOf(nidi.bufferServerPort)).concat("/").concat(connIdentifier),
queueCapacity)
: new
BufferServerSubscriber("tcp://".concat(nidi.bufferServerHost).concat(":").concat(String.valueOf(nidi.bufferServerPort)).concat("/").concat(connIdentifier),
queueCapacity);
+ if(streamCodec instanceof StreamCodecWrapperForPersistance) {
+ subscriber.acquireReservoirForPersistStream(sinkIdentifier,
queueCapacity, streamCodec);
+ }
SweepableReservoir reservoir =
subscriber.acquireReservoir(sinkIdentifier, queueCapacity);
if (checkpoint.windowId >= 0) {
node.connectInputPort(nidi.portName, new
WindowIdActivatedReservoir(sinkIdentifier, reservoir, checkpoint.windowId));
@@ -1176,10 +1180,17 @@ public class StreamingContainer extends
YarnContainerMain
}
/* here everything should be multisink capable */
- if (nidi.partitionKeys == null || nidi.partitionKeys.isEmpty()) {
+ if (streamCodec instanceof StreamCodecWrapperForPersistance) {
+ PartitionAwareSinkForPersistence pas;
+ if (nidi.partitionKeys == null) {
+ pas = new
PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance<Object>)
streamCodec, nidi.partitionMask, stream);
+ } else {
+ pas = new
PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance<Object>)
streamCodec, nidi.partitionKeys, nidi.partitionMask, stream);
+ }
+ ((Stream.MultiSinkCapableStream)
pair.component).setSink(sinkIdentifier, pas);
+ } else if (nidi.partitionKeys == null ||
nidi.partitionKeys.isEmpty()) {
((Stream.MultiSinkCapableStream)
pair.component).setSink(sinkIdentifier, stream);
- }
- else {
+ } else {
/*
* generally speaking we do not have partitions on the inline
streams so the control should not
* come here but if it comes, then we are ready to handle it
using the partition aware streams.
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f68d249c/engine/src/main/java/com/datatorrent/stram/plan/logical/DefaultKryoStreamCodec.java
----------------------------------------------------------------------
diff --git
a/engine/src/main/java/com/datatorrent/stram/plan/logical/DefaultKryoStreamCodec.java
b/engine/src/main/java/com/datatorrent/stram/plan/logical/DefaultKryoStreamCodec.java
new file mode 100644
index 0000000..12584f9
--- /dev/null
+++
b/engine/src/main/java/com/datatorrent/stram/plan/logical/DefaultKryoStreamCodec.java
@@ -0,0 +1,87 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.stram.plan.logical;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.common.util.SerializableObject;
+import com.datatorrent.netlet.util.DTThrowable;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This codec is used for serializing the objects of class which are Kryo
+ * serializable. Used for stream codec wrapper used for persistence
+ */
+public class DefaultKryoStreamCodec<T> extends SerializableObject implements
StreamCodec<T>
+{
+ final static Logger logger =
LoggerFactory.getLogger(DefaultKryoStreamCodec.class);
+
+ private static final long serialVersionUID = 1L;
+ protected final transient Kryo kryo;
+
+ public DefaultKryoStreamCodec()
+ {
+ this.kryo = new Kryo();
+ this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ }
+
+ @Override
+ public Object fromByteArray(Slice fragment)
+ {
+ try {
+ ByteArrayInputStream is = new ByteArrayInputStream(fragment.buffer,
fragment.offset, fragment.length);
+ Input input = new Input(is);
+ Object returnObject = kryo.readClassAndObject(input);
+ is.close();
+ return returnObject;
+ } catch (IOException e) {
+ DTThrowable.wrapIfChecked(e);
+ }
+ return null;
+ }
+
+ @Override
+ public Slice toByteArray(T info)
+ {
+ Slice slice = null;
+ try {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ Output output = new Output(os);
+ kryo.writeClassAndObject(output, info);
+ output.flush();
+ slice = new Slice(os.toByteArray(), 0, os.toByteArray().length);
+ os.close();
+ } catch (IOException e) {
+ DTThrowable.wrapIfChecked(e);
+ }
+ return slice;
+ }
+
+ @Override
+ public int getPartition(T t)
+ {
+ return t.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f68d249c/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git
a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 2d088b8..b9c7c19 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -21,6 +21,7 @@ import java.beans.PropertyDescriptor;
import java.io.*;
import java.lang.reflect.*;
import java.util.*;
+import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import javax.validation.*;
@@ -390,10 +391,19 @@ public class LogicalPlan implements Serializable, DAG
private final List<InputPortMeta> sinks = new ArrayList<InputPortMeta>();
private OutputPortMeta source;
private final String id;
+ private OperatorMeta persistOperatorForStream;
+ private InputPortMeta persistOperatorInputPort;
+ private Set<InputPortMeta> enableSinksForPersisting;
+ private String persistOperatorName;
+ public Map<InputPortMeta, OperatorMeta> sinkSpecificPersistOperatorMap;
+ public Map<InputPortMeta, InputPortMeta> sinkSpecificPersistInputPortMap;
private StreamMeta(String id)
{
this.id = id;
+ enableSinksForPersisting = new HashSet<InputPortMeta>();
+ sinkSpecificPersistOperatorMap = new HashMap<LogicalPlan.InputPortMeta,
OperatorMeta>();
+ sinkSpecificPersistInputPortMap = new HashMap<LogicalPlan.InputPortMeta,
InputPortMeta>();
}
@Override
@@ -403,12 +413,14 @@ public class LogicalPlan implements Serializable, DAG
}
@Override
- public Locality getLocality() {
+ public Locality getLocality()
+ {
return this.locality;
}
@Override
- public StreamMeta setLocality(Locality locality) {
+ public StreamMeta setLocality(Locality locality)
+ {
this.locality = locality;
return this;
}
@@ -458,13 +470,27 @@ public class LogicalPlan implements Serializable, DAG
return this;
}
- public void remove() {
+ public void remove()
+ {
for (InputPortMeta ipm : this.sinks) {
ipm.getOperatorWrapper().inputStreams.remove(ipm);
if (ipm.getOperatorWrapper().inputStreams.isEmpty()) {
rootOperators.add(ipm.getOperatorWrapper());
}
}
+ // Remove persist operator for at stream level if present:
+ if (getPersistOperator() != null) {
+ removeOperator(getPersistOperator().getOperator());
+ }
+
+ // Remove persist operators added for specific sinks :
+ if (!sinkSpecificPersistOperatorMap.isEmpty()) {
+ for (Entry<InputPortMeta, OperatorMeta> entry :
sinkSpecificPersistOperatorMap.entrySet()) {
+ removeOperator(entry.getValue().getOperator());
+ }
+ sinkSpecificPersistOperatorMap.clear();
+ sinkSpecificPersistInputPortMap.clear();
+ }
this.sinks.clear();
if (this.source != null) {
this.source.getOperatorMeta().outputStreams.remove(this.source);
@@ -516,6 +542,169 @@ public class LogicalPlan implements Serializable, DAG
return !((this.id == null) ? (other.id != null) :
!this.id.equals(other.id));
}
+ @Override
+ public StreamMeta persistUsing(String name, Operator persistOperator,
InputPort<?> port)
+ {
+ persistOperatorName = name;
+ enablePersistingForSinksAddedSoFar(persistOperator);
+ OperatorMeta persistOpMeta = createPersistOperatorMeta(persistOperator);
+ if (!persistOpMeta.getPortMapping().inPortMap.containsKey(port)) {
+ String msg = String.format("Port argument %s does not belong to
persist operator passed %s", port, persistOperator);
+ throw new IllegalArgumentException(msg);
+ }
+
+
setPersistOperatorInputPort(persistOpMeta.getPortMapping().inPortMap.get(port));
+
+ return this;
+ }
+
+ @Override
+ public StreamMeta persistUsing(String name, Operator persistOperator)
+ {
+ persistOperatorName = name;
+ enablePersistingForSinksAddedSoFar(persistOperator);
+ OperatorMeta persistOpMeta = createPersistOperatorMeta(persistOperator);
+ InputPortMeta port =
persistOpMeta.getPortMapping().inPortMap.values().iterator().next();
+ setPersistOperatorInputPort(port);
+ return this;
+ }
+
+ private void enablePersistingForSinksAddedSoFar(Operator persistOperator)
+ {
+ for (InputPortMeta portMeta : getSinks()) {
+ enableSinksForPersisting.add(portMeta);
+ }
+ }
+
+ private OperatorMeta createPersistOperatorMeta(Operator persistOperator)
+ {
+ addOperator(persistOperatorName, persistOperator);
+ OperatorMeta persistOpMeta = getOperatorMeta(persistOperatorName);
+ setPersistOperator(persistOpMeta);
+ if (persistOpMeta.getPortMapping().inPortMap.isEmpty()) {
+ String msg = String.format("Persist operator passed %s has no input
ports to connect", persistOperator);
+ throw new IllegalArgumentException(msg);
+ }
+ Map<InputPort<?>, InputPortMeta> inputPortMap =
persistOpMeta.getPortMapping().inPortMap;
+ int nonOptionalInputPortCount = 0;
+ for (InputPortMeta inputPort : inputPortMap.values()) {
+ if (inputPort.portAnnotation == null ||
!inputPort.portAnnotation.optional()) {
+ // By default input port is non-optional unless specified
+ nonOptionalInputPortCount++;
+ }
+ }
+
+ if (nonOptionalInputPortCount > 1) {
+ String msg = String.format("Persist operator %s has more than 1 non
optional input port", persistOperator);
+ throw new IllegalArgumentException(msg);
+ }
+
+ Map<OutputPort<?>, OutputPortMeta> outputPortMap =
persistOpMeta.getPortMapping().outPortMap;
+ for (OutputPortMeta outPort : outputPortMap.values()) {
+ if (outPort.portAnnotation != null &&
!outPort.portAnnotation.optional()) {
+ // By default output port is optional unless specified
+ String msg = String.format("Persist operator %s has non optional
output port %s", persistOperator, outPort.fieldName);
+ throw new IllegalArgumentException(msg);
+ }
+ }
+ return persistOpMeta;
+ }
+
+ public OperatorMeta getPersistOperator()
+ {
+ return persistOperatorForStream;
+ }
+
+ private void setPersistOperator(OperatorMeta persistOperator)
+ {
+ this.persistOperatorForStream = persistOperator;
+ }
+
+ public InputPortMeta getPersistOperatorInputPort()
+ {
+ return persistOperatorInputPort;
+ }
+
+ private void setPersistOperatorInputPort(InputPortMeta inport)
+ {
+ this.addSink(inport.getPortObject());
+ this.persistOperatorInputPort = inport;
+ }
+
+ public Set<InputPortMeta> getSinksToPersist()
+ {
+ return enableSinksForPersisting;
+ }
+
+ private String getPersistOperatorName(Operator operator)
+ {
+ return id + "_persister";
+ }
+
+ private String getPersistOperatorName(InputPort<?> sinkToPersist)
+ {
+ InputPortMeta portMeta = assertGetPortMeta(sinkToPersist);
+ OperatorMeta operatorMeta = portMeta.getOperatorWrapper();
+ return id + "_" + operatorMeta.getName() + "_persister";
+ }
+
+ @Override
+ public StreamMeta persistUsing(String name, Operator persistOperator,
InputPort<?> port, InputPort<?> sinkToPersist)
+ {
+ // When persist Stream is invoked for a specific sink, persist operator
can directly be added
+ String persistOperatorName = name;
+ addOperator(persistOperatorName, persistOperator);
+ addSink(port);
+ InputPortMeta sinkPortMeta = assertGetPortMeta(sinkToPersist);
+ addStreamCodec(sinkPortMeta, port);
+ updateSinkSpecificPersistOperatorMap(sinkPortMeta, persistOperatorName,
port);
+ return this;
+ }
+
+ private void addStreamCodec(InputPortMeta sinkToPersistPortMeta,
InputPort<?> port)
+ {
+ StreamCodec<Object> inputStreamCodec =
sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) != null ?
(StreamCodec<Object>) sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC)
: (StreamCodec<Object>) sinkToPersistPortMeta.getPortObject().getStreamCodec();
+ if (inputStreamCodec != null) {
+ Map<InputPortMeta, StreamCodec<Object>> codecs = new
HashMap<InputPortMeta, StreamCodec<Object>>();
+ codecs.put(sinkToPersistPortMeta, inputStreamCodec);
+ InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port);
+ StreamCodec<Object> specifiedCodecForPersistOperator =
(persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) != null) ?
(StreamCodec<Object>)
persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) :
(StreamCodec<Object>) port.getStreamCodec();
+ StreamCodecWrapperForPersistance<Object> codec = new
StreamCodecWrapperForPersistance<Object>(codecs,
specifiedCodecForPersistOperator);
+ setInputPortAttribute(port, PortContext.STREAM_CODEC, codec);
+ }
+ }
+
+ private void updateSinkSpecificPersistOperatorMap(InputPortMeta
sinkToPersistPortMeta, String persistOperatorName, InputPort<?>
persistOperatorInPort)
+ {
+ OperatorMeta persistOpMeta = operators.get(persistOperatorName);
+ this.sinkSpecificPersistOperatorMap.put(sinkToPersistPortMeta,
persistOpMeta);
+ this.sinkSpecificPersistInputPortMap.put(sinkToPersistPortMeta,
persistOpMeta.getMeta(persistOperatorInPort));
+ }
+
+ public void resetStreamPersistanceOnSinkRemoval(InputPortMeta
sinkBeingRemoved)
+ {
+ /*
+ * If persistStream was enabled for the entire stream and the operator
+ * to be removed was the only one enabled for persisting, Remove the
persist operator
+ */
+ if (enableSinksForPersisting.contains(sinkBeingRemoved)) {
+ enableSinksForPersisting.remove(sinkBeingRemoved);
+ if (enableSinksForPersisting.isEmpty()) {
+ removeOperator(getPersistOperator().getOperator());
+ setPersistOperator(null);
+ }
+ }
+
+ // If persisting was added specific to this sink, remove the persist
operator
+ if (sinkSpecificPersistInputPortMap.containsKey(sinkBeingRemoved)) {
+ sinkSpecificPersistInputPortMap.remove(sinkBeingRemoved);
+ }
+ if (sinkSpecificPersistOperatorMap.containsKey(sinkBeingRemoved)) {
+ OperatorMeta persistOpMeta =
sinkSpecificPersistOperatorMap.get(sinkBeingRemoved);
+ sinkSpecificPersistOperatorMap.remove(sinkBeingRemoved);
+ removeOperator(persistOpMeta.getOperator());
+ }
+ }
}
/**
@@ -878,9 +1067,12 @@ public class LogicalPlan implements Serializable, DAG
Map<InputPortMeta, StreamMeta> inputStreams = om.getInputStreams();
for (Map.Entry<InputPortMeta, StreamMeta> e : inputStreams.entrySet()) {
+ StreamMeta stream = e.getValue();
if (e.getKey().getOperatorWrapper() == om) {
- e.getValue().sinks.remove(e.getKey());
+ stream.sinks.remove(e.getKey());
}
+ // If persistStream was enabled for stream, reset stream when sink
removed
+ stream.resetStreamPersistanceOnSinkRemoval(e.getKey());
}
this.operators.remove(om.getName());
rootOperators.remove(om);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f68d249c/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java
----------------------------------------------------------------------
diff --git
a/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java
b/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java
new file mode 100644
index 0000000..97fd75f
--- /dev/null
+++
b/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java
@@ -0,0 +1,89 @@
+package com.datatorrent.stram.plan.logical;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.datatorrent.api.Partitioner.PartitionKeys;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.netlet.util.Slice;
+import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
+
+public class StreamCodecWrapperForPersistance<T> implements StreamCodec<T>,
Serializable
+{
+
+ private StreamCodec<Object> specifiedStreamCodec;
+ public Map<InputPortMeta, Collection<PartitionKeys>> inputPortToPartitionMap;
+ public Map<InputPortMeta, StreamCodec<Object>> codecsToMerge;
+ private boolean operatorPartitioned;
+
+ public StreamCodecWrapperForPersistance(Map<InputPortMeta,
StreamCodec<Object>> inputStreamCodecs, StreamCodec<Object>
specifiedStreamCodec)
+ {
+ this.codecsToMerge = inputStreamCodecs;
+ this.setSpecifiedStreamCodec(specifiedStreamCodec);
+ inputPortToPartitionMap = new HashMap<InputPortMeta,
Collection<PartitionKeys>>();
+ }
+
+ @Override
+ public Object fromByteArray(Slice fragment)
+ {
+ return getSpecifiedStreamCodec().fromByteArray(fragment);
+ }
+
+ @Override
+ public Slice toByteArray(T o)
+ {
+ return getSpecifiedStreamCodec().toByteArray(o);
+ }
+
+ @Override
+ public int getPartition(T o)
+ {
+ return getSpecifiedStreamCodec().getPartition(o);
+ }
+
+ public boolean shouldCaptureEvent(T o)
+ {
+ for (Entry<InputPortMeta, Collection<PartitionKeys>> entry :
inputPortToPartitionMap.entrySet()) {
+ StreamCodec<Object> codec = codecsToMerge.get(entry.getKey());
+ Collection<PartitionKeys> partitionKeysList = entry.getValue();
+
+ for (PartitionKeys keys : partitionKeysList) {
+ if (keys.partitions.contains(keys.mask & codec.getPartition(o))) {
+ // Then at least one of the partitions is getting this event
+ // So send the event to persist operator
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ public StreamCodec<Object> getSpecifiedStreamCodec()
+ {
+ if (specifiedStreamCodec == null) {
+ specifiedStreamCodec = new DefaultKryoStreamCodec();
+ }
+ return specifiedStreamCodec;
+ }
+
+ public void setSpecifiedStreamCodec(StreamCodec<Object> specifiedStreamCodec)
+ {
+ this.specifiedStreamCodec = specifiedStreamCodec;
+ }
+
+ public boolean isOperatorPartitioned()
+ {
+ return operatorPartitioned;
+ }
+
+ public void setOperatorPartitioned(boolean operatorPartitioned)
+ {
+ this.operatorPartitioned = operatorPartitioned;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f68d249c/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git
a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index a57a248..2176035 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -18,6 +18,7 @@ package com.datatorrent.stram.plan.physical;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
@@ -42,6 +43,7 @@ import com.datatorrent.api.StatsListener.OperatorRequest;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.common.util.AsyncFSStorageAgent;
+import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.stram.Journal.Recoverable;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.StramEvent;
@@ -51,6 +53,7 @@ import
com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
+import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
import com.datatorrent.stram.plan.physical.PTOperator.HostOperatorSet;
import com.datatorrent.stram.plan.physical.PTOperator.PTInput;
import com.datatorrent.stram.plan.physical.PTOperator.PTOutput;
@@ -304,6 +307,9 @@ public class PhysicalPlan implements Serializable
Stack<OperatorMeta> pendingNodes = new Stack<OperatorMeta>();
+ // Add logging operators for streams if not added already
+ updatePersistOperatorStreamCodec(dag);
+
for (OperatorMeta n : dag.getAllOperators()) {
pendingNodes.push(n);
}
@@ -332,6 +338,8 @@ public class PhysicalPlan implements Serializable
}
}
+ updatePartitionsInfoForPersistOperator(dag);
+
// assign operators to containers
int groupCount = 0;
Set<PTOperator> deployOperators = Sets.newHashSet();
@@ -371,6 +379,106 @@ public class PhysicalPlan implements Serializable
this.undeployOpers.clear();
}
+ private void updatePartitionsInfoForPersistOperator(LogicalPlan dag)
+ {
+ // Add Partition mask and partition keys of Sinks to persist to Wrapper
+ // StreamCodec for persist operator
+ try {
+ for (OperatorMeta n : dag.getAllOperators()) {
+ for (StreamMeta s : n.getOutputStreams().values()) {
+ if (s.getPersistOperator() != null) {
+ InputPortMeta persistInputPort = s.getPersistOperatorInputPort();
+ StreamCodecWrapperForPersistance persistCodec =
(StreamCodecWrapperForPersistance)
persistInputPort.getAttributes().get(PortContext.STREAM_CODEC);
+ if (persistCodec == null)
+ continue;
+ // Logging is enabled for the stream
+ for (InputPortMeta portMeta : s.getSinksToPersist()) {
+ updatePersistOperatorWithSinkPartitions(persistInputPort,
s.getPersistOperator(), persistCodec, portMeta);
+ }
+ }
+
+ // Check partitioning for persist operators per sink too
+ for (Entry<InputPortMeta, InputPortMeta> entry :
s.sinkSpecificPersistInputPortMap.entrySet()) {
+ InputPortMeta persistInputPort = entry.getValue();
+ StreamCodec codec =
persistInputPort.getAttributes().get(PortContext.STREAM_CODEC);
+ if (codec != null) {
+ if (codec instanceof StreamCodecWrapperForPersistance) {
+ StreamCodecWrapperForPersistance persistCodec =
(StreamCodecWrapperForPersistance) codec;
+ updatePersistOperatorWithSinkPartitions(persistInputPort,
s.sinkSpecificPersistOperatorMap.get(entry.getKey()), persistCodec,
entry.getKey());
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ DTThrowable.wrapIfChecked(e);
+ }
+ }
+
+ private void updatePersistOperatorWithSinkPartitions(InputPortMeta
persistInputPort, OperatorMeta persistOperatorMeta,
StreamCodecWrapperForPersistance persistCodec, InputPortMeta sinkPortMeta)
+ {
+ Collection<PTOperator> ptOperators =
getOperators(sinkPortMeta.getOperatorWrapper());
+ Collection<PartitionKeys> partitionKeysList = new
ArrayList<PartitionKeys>();
+ for (PTOperator p : ptOperators) {
+ PartitionKeys keys = (PartitionKeys)
p.getPartitionKeys().get(sinkPortMeta.getPortObject());
+ partitionKeysList.add(keys);
+ }
+
+ persistCodec.inputPortToPartitionMap.put(sinkPortMeta, partitionKeysList);
+ }
+
+ private void updatePersistOperatorStreamCodec(LogicalPlan dag)
+ {
+ HashMap<StreamMeta, StreamCodec<?>> streamMetaToCodecMap = new
HashMap<StreamMeta, StreamCodec<?>>();
+ try {
+ for (OperatorMeta n : dag.getAllOperators()) {
+ for (StreamMeta s : n.getOutputStreams().values()) {
+ if (s.getPersistOperator() != null) {
+ Map<InputPortMeta, StreamCodec<Object>> inputStreamCodecs = new
HashMap<InputPortMeta, StreamCodec<Object>>();
+ // Logging is enabled for the stream
+ for (InputPortMeta portMeta : s.getSinksToPersist()) {
+ InputPort<?> port = portMeta.getPortObject();
+ StreamCodec<?> inputStreamCodec =
(portMeta.getValue(PortContext.STREAM_CODEC) != null) ?
portMeta.getValue(PortContext.STREAM_CODEC) : port.getStreamCodec();
+ if (inputStreamCodec != null) {
+ boolean alreadyAdded = false;
+
+ for (StreamCodec<?> codec : inputStreamCodecs.values()) {
+ if (inputStreamCodec.equals(codec)) {
+ alreadyAdded = true;
+ break;
+ }
+ }
+ if (!alreadyAdded) {
+ inputStreamCodecs.put(portMeta, (StreamCodec<Object>)
inputStreamCodec);
+ }
+ }
+ }
+
+ if (inputStreamCodecs.isEmpty()) {
+ // Stream codec not specified
+ // So everything out of Source should be captured without any
+ // StreamCodec
+ // Do nothing
+ } else {
+ // Create Wrapper codec for Stream persistence using all unique
+ // stream codecs
+ // Logger should write merged or union of all input stream codecs
+ StreamCodec<Object> specifiedCodecForLogger =
(s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) != null) ?
(StreamCodec<Object>)
s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) :
(StreamCodec<Object>)
s.getPersistOperatorInputPort().getPortObject().getStreamCodec();
+ StreamCodecWrapperForPersistance<Object> codec = new
StreamCodecWrapperForPersistance<Object>(inputStreamCodecs,
specifiedCodecForLogger);
+ streamMetaToCodecMap.put(s, codec);
+ }
+ }
+ }
+ }
+
+ for (java.util.Map.Entry<StreamMeta, StreamCodec<?>> entry :
streamMetaToCodecMap.entrySet()) {
+
dag.setInputPortAttribute(entry.getKey().getPersistOperatorInputPort().getPortObject(),
PortContext.STREAM_CODEC, entry.getValue());
+ }
+ } catch (Exception e) {
+ DTThrowable.wrapIfChecked(e);
+ }
+ }
+
private void setContainer(PTOperator pOperator, PTContainer container) {
LOG.debug("Setting container {} for {}", container, pOperator);
assert (pOperator.container == null) : "Container already assigned for " +
pOperator;
@@ -856,7 +964,7 @@ public class PhysicalPlan implements Serializable
Set<PTContainer> newContainers = Sets.newHashSet();
Set<PTContainer> releaseContainers = Sets.newHashSet();
assignContainers(newContainers, releaseContainers);
-
+ updatePartitionsInfoForPersistOperator(this.dag);
this.undeployOpers.removeAll(newOpers.keySet());
//make sure all the new operators are included in deploy operator list
this.deployOpers.addAll(this.newOpers.keySet());
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f68d249c/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
----------------------------------------------------------------------
diff --git
a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
index 56cb323..8bce425 100644
---
a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
+++
b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
@@ -37,6 +37,7 @@ import com.datatorrent.stram.engine.ByteCounterStream;
import com.datatorrent.stram.engine.StreamContext;
import com.datatorrent.stram.engine.SweepableReservoir;
import com.datatorrent.stram.engine.WindowGenerator;
+import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
import com.datatorrent.stram.tuple.*;
/**
@@ -169,6 +170,22 @@ public class BufferServerSubscriber extends Subscriber
implements ByteCounterStr
return r;
}
+ public SweepableReservoir acquireReservoirForPersistStream(String id, int
capacity, StreamCodec<?> streamCodec)
+ {
+ BufferReservoir r = reservoirMap.get(id);
+ if (r == null) {
+ reservoirMap.put(id, r = new BufferReservoirForPersistStream(capacity,
(StreamCodecWrapperForPersistance<Object>)streamCodec));
+ BufferReservoir[] newReservoirs = new BufferReservoir[reservoirs.length
+ 1];
+ newReservoirs[reservoirs.length] = r;
+ for (int i = reservoirs.length; i-- > 0;) {
+ newReservoirs[i] = reservoirs[i];
+ }
+ reservoirs = newReservoirs;
+ }
+
+ return r;
+ }
+
@Override
public void put(Object tuple)
{
@@ -212,6 +229,7 @@ public class BufferServerSubscriber extends Subscriber
implements ByteCounterStr
class BufferReservoir extends CircularBuffer<Object> implements
SweepableReservoir
{
+ protected boolean skipObject = false;
private Sink<Object> sink;
int count;
@@ -292,13 +310,7 @@ public class BufferServerSubscriber extends Subscriber
implements ByteCounterStr
break;
case PAYLOAD:
- if (statefulSerde == null) {
- o = serde.fromByteArray(data.getData());
- }
- else {
- dsp.data = data.getData();
- o = statefulSerde.fromDataStatePair(dsp);
- }
+ o = processPayload(data);
break;
case CHECKPOINT:
@@ -326,8 +338,12 @@ public class BufferServerSubscriber extends Subscriber
implements ByteCounterStr
}
freeFragments.offer(fm);
- for (int i = reservoirs.length; i-- > 0;) {
- reservoirs[i].add(o);
+ if (skipObject) {
+ skipObject = false;
+ } else {
+ for (int i = reservoirs.length; i-- > 0;) {
+ reservoirs[i].add(o);
+ }
}
}
}
@@ -335,6 +351,18 @@ public class BufferServerSubscriber extends Subscriber
implements ByteCounterStr
return null;
}
+ protected Object processPayload(com.datatorrent.bufferserver.packet.Tuple
data)
+ {
+ Object o;
+ if (statefulSerde == null) {
+ o = serde.fromByteArray(data.getData());
+ } else {
+ dsp.data = data.getData();
+ o = statefulSerde.fromDataStatePair(dsp);
+ }
+ return o;
+ }
+
@Override
public int getCount(boolean reset)
{
@@ -350,5 +378,27 @@ public class BufferServerSubscriber extends Subscriber
implements ByteCounterStr
}
+ public class BufferReservoirForPersistStream extends BufferReservoir
+ {
+ StreamCodecWrapperForPersistance wrapperStreamCodec;
+
+ BufferReservoirForPersistStream(int capacity,
StreamCodecWrapperForPersistance<Object> streamCodec)
+ {
+ super(capacity);
+ wrapperStreamCodec = streamCodec;
+ }
+
+ @Override
+ protected Object processPayload(com.datatorrent.bufferserver.packet.Tuple
data)
+ {
+ Object o = wrapperStreamCodec.fromByteArray(data.getData());
+ if (!wrapperStreamCodec.shouldCaptureEvent(o)) {
+ skipObject = true;
+ }
+
+ return o;
+ }
+ }
+
private static final Logger logger =
LoggerFactory.getLogger(BufferServerSubscriber.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f68d249c/engine/src/main/java/com/datatorrent/stram/stream/PartitionAwareSink.java
----------------------------------------------------------------------
diff --git
a/engine/src/main/java/com/datatorrent/stram/stream/PartitionAwareSink.java
b/engine/src/main/java/com/datatorrent/stram/stream/PartitionAwareSink.java
index 11c6ac6..6f5f3cd 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/PartitionAwareSink.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/PartitionAwareSink.java
@@ -18,6 +18,7 @@ package com.datatorrent.stram.stream;
import com.datatorrent.stram.tuple.Tuple;
import com.datatorrent.api.Sink;
import com.datatorrent.api.StreamCodec;
+
import java.util.Set;
/**
@@ -60,12 +61,17 @@ public class PartitionAwareSink<T> implements Sink<T>
count++;
output.put(payload);
}
- else if (partitions.contains(serde.getPartition(payload) & mask)) {
+ else if (canSendToOutput(payload)) {
count++;
output.put(payload);
}
}
+ protected boolean canSendToOutput(T payload)
+ {
+ return partitions.contains(serde.getPartition(payload) & mask);
+ }
+
@Override
public int getCount(boolean reset)
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f68d249c/engine/src/main/java/com/datatorrent/stram/stream/PartitionAwareSinkForPersistence.java
----------------------------------------------------------------------
diff --git
a/engine/src/main/java/com/datatorrent/stram/stream/PartitionAwareSinkForPersistence.java
b/engine/src/main/java/com/datatorrent/stram/stream/PartitionAwareSinkForPersistence.java
new file mode 100644
index 0000000..f3852b7
--- /dev/null
+++
b/engine/src/main/java/com/datatorrent/stram/stream/PartitionAwareSinkForPersistence.java
@@ -0,0 +1,45 @@
+package com.datatorrent.stram.stream;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.datatorrent.api.Sink;
+import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
+
+public class PartitionAwareSinkForPersistence extends
PartitionAwareSink<Object>
+{
+ StreamCodecWrapperForPersistance<Object> serdeForPersistence;
+
+ public
PartitionAwareSinkForPersistence(StreamCodecWrapperForPersistance<Object>
serde, Set<Integer> partitions, int mask, Sink<Object> output)
+ {
+ super(serde, partitions, mask, output);
+ serdeForPersistence = serde;
+ }
+
+ public
PartitionAwareSinkForPersistence(StreamCodecWrapperForPersistance<Object>
serde, int mask, Sink<Object> output)
+ {
+ // If partition keys is null, everything should be passed to sink
+ super(serde, createPartitionKeys(mask), mask, output);
+ serdeForPersistence = serde;
+ }
+
+ private static Set<Integer> createPartitionKeys(int mask)
+ {
+ Set<Integer> partitions = new HashSet<Integer>();
+ // Add all entries in mask to partitions keys
+ for(int i =0 ; i <= mask; i++) {
+ partitions.add(i);
+ }
+ return partitions;
+ }
+
+ @Override
+ protected boolean canSendToOutput(Object payload)
+ {
+ if (!serdeForPersistence.shouldCaptureEvent(payload)) {
+ return false;
+ }
+
+ return super.canSendToOutput(payload);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f68d249c/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
----------------------------------------------------------------------
diff --git
a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
new file mode 100644
index 0000000..c82f3a9
--- /dev/null
+++
b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
@@ -0,0 +1,988 @@
+package com.datatorrent.stram.plan;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DAG.OperatorMeta;
+import com.datatorrent.api.DAG.StreamMeta;
+import com.datatorrent.api.Partitioner.PartitionKeys;
+import com.datatorrent.api.*;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.netlet.util.DTThrowable;
+import com.datatorrent.stram.PartitioningTest;
+import com.datatorrent.stram.StramLocalCluster;
+import com.datatorrent.stram.StreamingContainerManager;
+import com.datatorrent.stram.StreamingContainerManagerTest;
+import com.datatorrent.stram.engine.GenericTestOperator;
+import com.datatorrent.stram.engine.TestGeneratorInputOperator;
+import com.datatorrent.stram.plan.logical.DefaultKryoStreamCodec;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
+import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
+import com.datatorrent.stram.plan.physical.PTContainer;
+import com.datatorrent.stram.plan.physical.PTOperator;
+import com.datatorrent.stram.plan.physical.PhysicalPlan;
+import com.datatorrent.stram.support.StramTestSupport;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class StreamPersistanceTests
+{
+ final static Logger logger =
LoggerFactory.getLogger(StreamPersistanceTests.class);
+
+ public static class TestRecieverOperator extends BaseOperator
+ {
+ public static volatile List<Object> results = new ArrayList<Object>();
+ public volatile AtomicInteger size = new AtomicInteger(0);
+
+ @InputPortFieldAnnotation(optional = true)
+ final public transient InputPort<Object> inport = new
DefaultInputPort<Object>()
+ {
+ @Override
+ final public void process(Object t)
+ {
+ results.add(t);
+ size.incrementAndGet();
+ }
+ };
+ }
+
+ public static class TestPersistanceOperator implements Operator
+ {
+ public static volatile List<Object> results = new ArrayList<Object>();
+
+ @InputPortFieldAnnotation(optional = true)
+ final public transient InputPort<Object> inport = new
DefaultInputPort<Object>()
+ {
+ @Override
+ final public void process(Object t)
+ {
+ results.add(t);
+ }
+ };
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ }
+
+ }
+
+ public static class PartitionedTestPersistanceOperator extends
TestPersistanceOperator implements
Partitioner<PartitionedTestPersistanceOperator>
+ {
+ @Override
+ public Collection definePartitions(Collection partitions,
PartitioningContext context)
+ {
+ Collection<Partition> newPartitions = new ArrayList<Partition>();
+
+ int partitionMask = 0x03;
+
+ // No partitioning done so far..
+ // Single partition with mask 0x03 and set {0}
+ // First partition
+ PartitionedTestPersistanceOperator newInstance = new
PartitionedTestPersistanceOperator();
+ Partition partition = new
DefaultPartition<PartitionedTestPersistanceOperator>(newInstance);
+ PartitionKeys value = new PartitionKeys(partitionMask,
Sets.newHashSet(0));
+ partition.getPartitionKeys().put(inport, value);
+ newPartitions.add(partition);
+
+ return newPartitions;
+ }
+
+ @Override
+ public void partitioned(Map partitions)
+ {
+ // TODO Auto-generated method stub
+ }
+ }
+
+ public class TestOperatorWithOutputPorts extends BaseOperator
+ {
+
+ @InputPortFieldAnnotation(optional = true)
+ final public transient DefaultInputPort<Object> inputPort = new
DefaultInputPort<Object>()
+ {
+ @Override
+ final public void process(Object t)
+ {
+ // Do nothing: Dummy operator for test
+ }
+ };
+
+ @InputPortFieldAnnotation(optional = false)
+ final public transient DefaultOutputPort<Object> outputPort = new
DefaultOutputPort<Object>();
+ }
+
+ public class TestOperatorWithMultipleNonOptionalInputPorts extends
BaseOperator
+ {
+
+ @InputPortFieldAnnotation(optional = false)
+ final public transient DefaultInputPort<Object> inputPort1 = new
DefaultInputPort<Object>()
+ {
+ @Override
+ final public void process(Object t)
+ {
+ // Do nothing: Dummy operator for test
+ }
+ };
+
+ @InputPortFieldAnnotation(optional = false)
+ final public transient DefaultInputPort<Object> inputPort2 = new
DefaultInputPort<Object>()
+ {
+ @Override
+ final public void process(Object t)
+ {
+ // Do nothing: Dummy operator for test
+ }
+ };
+
+ final public transient DefaultInputPort<Object> inputPort3 = new
DefaultInputPort<Object>()
+ {
+ @Override
+ final public void process(Object t)
+ {
+ // Do nothing: Dummy operator for test
+ }
+ };
+ }
+
+ public class TestOperatorWithoutInputPorts extends BaseOperator
+ {
+ }
+
+ @Test
+ public void testPersistStreamOperatorIsAdded()
+ {
+ LogicalPlan dag = new LogicalPlan();
+ TestGeneratorInputOperator input1 = dag.addOperator("input1",
TestGeneratorInputOperator.class);
+ GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
+ TestRecieverOperator persister = new TestRecieverOperator();
+ StreamMeta stream = dag.addStream("Stream1", input1.outport, x.inport1);
+ stream.persistUsing("Stream1_persister",persister, persister.inport);
+
+ // Check operator is added to dag
+ OperatorMeta persistOperatorMeta =
dag.getOperatorMeta("Stream1_persister");
+ assertEquals("Persist operator not added to dag ", persister,
persistOperatorMeta.getOperator());
+ dag.validate();
+ }
+
+ @Test
+ public void testPersistStreamOperatorIsAddedPerSink()
+ {
+ LogicalPlan dag = new LogicalPlan();
+ TestGeneratorInputOperator input1 = dag.addOperator("input1",
TestGeneratorInputOperator.class);
+ GenericTestOperator x1 = dag.addOperator("x1", new GenericTestOperator());
+ GenericTestOperator x2 = dag.addOperator("x2", new GenericTestOperator());
+ GenericTestOperator x3 = dag.addOperator("x3", new GenericTestOperator());
+
+ TestRecieverOperator persister = new TestRecieverOperator();
+ TestRecieverOperator persister1 = new TestRecieverOperator();
+ TestRecieverOperator persister2 = new TestRecieverOperator();
+
+ StreamMeta stream = dag.addStream("Stream1", input1.outport, x1.inport1,
x2.inport1, x3.inport1);
+
+ stream.persistUsing("Stream1_persister", persister, persister.inport);
+ stream.persistUsing("Stream1_x1_persister", persister1, persister1.inport,
x1.inport1);
+ stream.persistUsing("Stream1_x2_persister", persister2, persister2.inport,
x2.inport1);
+
+ // Check 3 persist operators are added to dag
+ OperatorMeta persistOperatorMeta =
dag.getOperatorMeta("Stream1_persister");
+ assertEquals("Persist operator not added to dag ", persister,
persistOperatorMeta.getOperator());
+
+ persistOperatorMeta = dag.getOperatorMeta("Stream1_x1_persister");
+ assertEquals("Persist operator not added to dag ", persister1,
persistOperatorMeta.getOperator());
+
+ persistOperatorMeta = dag.getOperatorMeta("Stream1_x2_persister");
+ assertEquals("Persist operator not added to dag ", persister2,
persistOperatorMeta.getOperator());
+
+ dag.validate();
+ }
+
+ @Test
+ public void testaddStreamThrowsExceptionOnInvalidLoggerType()
+ {
+ // Test Logger with non-optional output ports
+ LogicalPlan dag = new LogicalPlan();
+ TestGeneratorInputOperator input1 = dag.addOperator("input1",
TestGeneratorInputOperator.class);
+ GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
+ StreamMeta stream = dag.addStream("Stream1", input1.outport, x.inport1);
+
+ TestOperatorWithOutputPorts persister = new TestOperatorWithOutputPorts();
+ try {
+ stream.persistUsing("persister", persister, persister.inputPort);
+ Assert.fail("should throw Illegal argument exception: Persist operator
has non optional output ports");
+ } catch (IllegalArgumentException e) {
+ logger.debug(e.getMessage());
+ }
+
+ // Test already added operator passed
+ TestOperatorWithOutputPorts persister1 = new TestOperatorWithOutputPorts();
+ try {
+ stream.persistUsing("Stream1_persister", persister1,
persister1.inputPort);
+ Assert.fail("should throw exception that Stream1_persister object was
already added");
+ } catch (IllegalArgumentException e) {
+ logger.debug(e.getMessage());
+ }
+
+ // Test persist operator without any input ports
+ dag.removeOperator(dag.getOperatorMeta("Stream1_persister").getOperator());
+ TestOperatorWithoutInputPorts logger2 = new
TestOperatorWithoutInputPorts();
+ try {
+ stream.persistUsing("Stream1_persister", logger2);
+ Assert.fail("should throw Illegal argument exception: persist operator
should have input ports");
+ } catch (IllegalArgumentException e) {
+ logger.debug(e.getMessage());
+ }
+
+ // Test persist operator with more than one input port as non-optional
+ dag.removeOperator(dag.getOperatorMeta("Stream1_persister").getOperator());
+ TestOperatorWithMultipleNonOptionalInputPorts persister3 = new
TestOperatorWithMultipleNonOptionalInputPorts();
+ try {
+ stream.persistUsing("Stream1_persister", persister3);
+ Assert.fail("should throw Illegal argument exception: persist operator
should have at most 1 non-optional input port");
+ } catch (IllegalArgumentException e) {
+ logger.debug(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testaddStreamThrowsExceptionOnInvalidInputPortForLoggerType()
+ {
+ // Test for input port belonging to different object
+ LogicalPlan dag = new LogicalPlan();
+ TestGeneratorInputOperator input1 = dag.addOperator("input1",
TestGeneratorInputOperator.class);
+ GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
+ TestRecieverOperator persister = new TestRecieverOperator();
+ TestRecieverOperator persister1 = new TestRecieverOperator();
+ StreamMeta stream = dag.addStream("Stream1", input1.outport, x.inport1);
+ try {
+ stream.persistUsing("Stream1_persister", persister, persister1.inport);
+ Assert.fail("should throw Illegal argument exception: Port passed does
not belong to operator class");
+ } catch (IllegalArgumentException e) {
+ }
+
+ // Remove persist operator from dag
+ dag.removeOperator(dag.getOperatorMeta("Stream1_persister").getOperator());
+ }
+
+ @Test
+ public void testPersistStreamOperatorIsRemovedWhenStreamIsRemoved()
+ {
+ // Remove Stream and check if persist operator is removed
+ LogicalPlan dag = new LogicalPlan();
+ TestGeneratorInputOperator input1 = dag.addOperator("input1",
TestGeneratorInputOperator.class);
+ GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
+ TestRecieverOperator persister = new TestRecieverOperator();
+ StreamMeta stream = dag.addStream("Stream1", input1.outport, x.inport1);
+ stream.persistUsing("Stream1_persister", persister, persister.inport);
+
+ ((LogicalPlan.StreamMeta) stream).remove();
+
+ // Check operator is added to dag
+ OperatorMeta persistOperatorMeta =
dag.getOperatorMeta("Stream1_persister");
+ assertEquals("Persist operator should be removed from dag after
stream.remove", null, persistOperatorMeta);
+ }
+
+ @Test
+ public void testPersistStreamOperatorIsRemovedWhenSinkIsRemoved()
+ {
+ // Remove sink and check if corresponding persist operator is removed
+ LogicalPlan dag = new LogicalPlan();
+ TestGeneratorInputOperator input1 = dag.addOperator("input1",
TestGeneratorInputOperator.class);
+ GenericTestOperator x1 = dag.addOperator("x1", new GenericTestOperator());
+ GenericTestOperator x2 = dag.addOperator("x2", new GenericTestOperator());
+ GenericTestOperator x3 = dag.addOperator("x3", new GenericTestOperator());
+
+ TestRecieverOperator persister = new TestRecieverOperator();
+ TestRecieverOperator persister1 = new TestRecieverOperator();
+ TestRecieverOperator persister2 = new TestRecieverOperator();
+
+ StreamMeta stream = dag.addStream("Stream1", input1.outport, x1.inport1,
x2.inport1, x3.inport1);
+
+ stream.persistUsing("Stream1_persister", persister, persister.inport);
+ stream.persistUsing("Stream1_x1_persister", persister1, persister1.inport,
x1.inport1);
+ stream.persistUsing("Stream1_x2_persister", persister2, persister2.inport,
x2.inport1);
+
+ // Check 3 persist operators are added to dag
+ OperatorMeta persistOperatorMeta =
dag.getOperatorMeta("Stream1_persister");
+ assertEquals("Persist operator not added to dag ", persister,
persistOperatorMeta.getOperator());
+
+ persistOperatorMeta = dag.getOperatorMeta("Stream1_x1_persister");
+ assertEquals("Persist operator not added to dag ", persister1,
persistOperatorMeta.getOperator());
+
+ persistOperatorMeta = dag.getOperatorMeta("Stream1_x2_persister");
+ assertEquals("Persist operator not added to dag ", persister2,
persistOperatorMeta.getOperator());
+
+ dag.removeOperator(x1);
+ // Check persister for x1 is removed
+ persistOperatorMeta = dag.getOperatorMeta("Stream1_x1_persister");
+ assertEquals("Persist operator should be removed from dag after sink is
removed", null, persistOperatorMeta);
+
+ // Check other persisters are unchanged
+
+ persistOperatorMeta = dag.getOperatorMeta("Stream1_persister");
+ assertEquals("Persist operator not added to dag ", persister,
persistOperatorMeta.getOperator());
+
+ persistOperatorMeta = dag.getOperatorMeta("Stream1_x2_persister");
+ assertEquals("Persist operator not added to dag ", persister2,
persistOperatorMeta.getOperator());
+ }
+
+ @Test
+ public void testPersistStreamOperatorIsRemovedWhenAllSinksAreRemoved()
+ {
+ LogicalPlan dag = new LogicalPlan();
+ TestGeneratorInputOperator input1 = dag.addOperator("input1",
TestGeneratorInputOperator.class);
+ GenericTestOperator x1 = dag.addOperator("x1", new GenericTestOperator());
+ GenericTestOperator x2 = dag.addOperator("x2", new GenericTestOperator());
+ GenericTestOperator x3 = dag.addOperator("x3", new GenericTestOperator());
+
+ TestRecieverOperator persister = new TestRecieverOperator();
+
+ StreamMeta stream = dag.addStream("Stream1", input1.outport, x1.inport1,
x2.inport1, x3.inport1);
+
+ stream.persistUsing("Stream1_persister", persister, persister.inport);
+
+ // Check stream persister is added to the dag
+ Assert.assertNotNull("Stream persister operator should be present",
dag.getOperatorMeta("Stream1_persister"));
+
+ // remove sink operators x1, x2, x3 from dag and check that persister
+ // operator is removed
+ dag.removeOperator(x1);
+ dag.removeOperator(x2);
+ dag.removeOperator(x3);
+ Assert.assertNull("Persister operator should have been removed after all
sinks are removed", dag.getOperatorMeta("Stream1_persister"));
+ }
+
+ @Test
+ public void testPersistStreamOperatorGeneratesIdenticalOutputAsSink() throws
ClassNotFoundException, IOException, InterruptedException
+ {
+ LogicalPlan dag = new LogicalPlan();
+ AscendingNumbersOperator input1 = dag.addOperator("input1",
AscendingNumbersOperator.class);
+ // Add PersistOperator directly to dag
+ final TestRecieverOperator x = dag.addOperator("x", new
TestRecieverOperator());
+ StreamMeta stream = dag.addStream("Stream1", input1.outputPort, x.inport);
+
+ // Use an instance of PersistOperator to persist stream
+ TestPersistanceOperator persister = new TestPersistanceOperator();
+ stream.persistUsing("Stream1_persister", persister, persister.inport);
+
+ runLocalClusterAndValidate(dag, x, persister);
+ }
+
+ private void runLocalClusterAndValidate(LogicalPlan dag, final
TestRecieverOperator x, final TestPersistanceOperator persister) throws
IOException, ClassNotFoundException
+ {
+ try {
+ x.results.clear();
+ persister.results.clear();
+ // Run local cluster and verify both results are identical
+ final StramLocalCluster lc = new StramLocalCluster(dag);
+
+ new Thread("LocalClusterController")
+ {
+ @Override
+ public void run()
+ {
+ long startTms = System.currentTimeMillis();
+ long timeout = 100000L;
+ try {
+ while (System.currentTimeMillis() - startTms < timeout) {
+ if (x.results.size() < 1000) {
+ Thread.sleep(10);
+ } else {
+ break;
+ }
+ }
+ } catch (Exception ex) {
+ DTThrowable.rethrow(ex);
+ } finally {
+ lc.shutdown();
+ }
+ }
+
+ }.start();
+
+ lc.run();
+ int maxTuples = x.results.size() > persister.results.size() ?
persister.results.size() : x.results.size();
+ // Output of both operators should be identical
+ for (int i = 0; i < maxTuples; i++) {
+ logger.debug("Tuple = " + x.results.get(i) + " - " +
persister.results.get(i));
+ assertEquals("Mismatch observed for tuple ", x.results.get(i),
persister.results.get(i));
+ }
+ } finally {
+ x.results.clear();
+ persister.results.clear();
+ }
+ }
+
+ public static class AscendingNumbersOperator implements InputOperator
+ {
+
+ private Integer count = 0;
+
+ @Override
+ public void emitTuples()
+ {
+
+ outputPort.emit(count++);
+ }
+
+ public final transient DefaultOutputPort<Integer> outputPort = new
DefaultOutputPort<>();
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ };
+
+ public static class DivisibleByStreamCodec extends DefaultKryoStreamCodec
+ {
+
+ protected int number = 1;
+
+ public DivisibleByStreamCodec()
+ {
+ super();
+ }
+
+ public DivisibleByStreamCodec(int number)
+ {
+ super();
+ this.number = number;
+ }
+
+ @Override
+ public int getPartition(Object o)
+ {
+ if ((Integer) o % number == 0) {
+ return 1;
+ }
+ return 2;
+ }
+
+ }
+
+ public static class PassThruOperatorWithCodec extends BaseOperator
implements Partitioner<PassThruOperatorWithCodec>
+ {
+
+ private int divisibleBy = 1;
+
+ public PassThruOperatorWithCodec()
+ {
+ }
+
+ public PassThruOperatorWithCodec(int divisibleBy)
+ {
+ this.divisibleBy = divisibleBy;
+ }
+
+ public final transient DefaultInputPort<Object> input = new
DefaultInputPort<Object>()
+ {
+ @Override
+ public void process(Object tuple)
+ {
+ output.emit(tuple);
+ }
+
+ @Override
+ public StreamCodec<Object> getStreamCodec()
+ {
+ return new DivisibleByStreamCodec(divisibleBy);
+ }
+ };
+
+ public final transient DefaultOutputPort<Object> output = new
DefaultOutputPort<Object>();
+
+ @Override
+ public Collection definePartitions(Collection partitions,
PartitioningContext context)
+ {
+ Collection<Partition> newPartitions = new ArrayList<Partition>();
+
+ // Mostly for 1 partition we dont need to do this
+ int partitionBits = (Integer.numberOfLeadingZeros(0) -
Integer.numberOfLeadingZeros(1));
+ int partitionMask = 0;
+ if (partitionBits > 0) {
+ partitionMask = -1 >>> (Integer.numberOfLeadingZeros(-1)) -
partitionBits;
+ }
+
+ partitionMask = 1;
+
+ if (partitions.size() == 1) {
+ // No partitioning done so far..
+ // Single partition again, but with only even numbers ok?
+ PassThruOperatorWithCodec newInstance = new
PassThruOperatorWithCodec();
+ Partition partition = new
DefaultPartition<PassThruOperatorWithCodec>(newInstance);
+
+ // Consider partitions are 1 & 2 and we are sending only 1 partition
+ // Partition 1 = even numbers
+ // Partition 2 = odd numbers
+ PartitionKeys value = new PartitionKeys(partitionMask,
Sets.newHashSet(1));
+ partition.getPartitionKeys().put(input, value);
+ newPartitions.add(partition);
+ }
+
+ return newPartitions;
+ }
+
+ @Override
+ public void partitioned(Map partitions)
+ {
+ // TODO Auto-generated method stub
+
+ }
+ }
+
+ @Test
+ public void testPersistStreamWithFiltering() throws ClassNotFoundException,
IOException, InterruptedException
+ {
+ LogicalPlan dag = new LogicalPlan();
+ AscendingNumbersOperator ascend = dag.addOperator("ascend", new
AscendingNumbersOperator());
+ PassThruOperatorWithCodec passThru = dag.addOperator("PassThrough", new
PassThruOperatorWithCodec(2));
+ TestRecieverOperator console = dag.addOperator("console", new
TestRecieverOperator());
+ TestPersistanceOperator console1 = new TestPersistanceOperator();
+ StreamMeta s = dag.addStream("Stream1", ascend.outputPort, passThru.input);
+ s.persistUsing("Stream1_persister", console1, console1.inport);
+ dag.addStream("Stream2", passThru.output, console.inport);
+ runLocalClusterAndValidate(dag, console, console1);
+ }
+
+ @Test
+ public void testPersistStreamOnSingleSinkWithFiltering() throws
ClassNotFoundException, IOException, InterruptedException
+ {
+ LogicalPlan dag = new LogicalPlan();
+ AscendingNumbersOperator ascend = dag.addOperator("ascend", new
AscendingNumbersOperator());
+ PassThruOperatorWithCodec passThru = dag.addOperator("PassThrough", new
PassThruOperatorWithCodec(2));
+ final TestRecieverOperator console = dag.addOperator("console", new
TestRecieverOperator());
+
+ TestPersistanceOperator persister = new TestPersistanceOperator();
+ StreamMeta s = dag.addStream("Stream1", ascend.outputPort, passThru.input);
+ s.persistUsing("Stream1_persister", persister, persister.inport,
passThru.input);
+ dag.addStream("Stream2", passThru.output, console.inport);
+ runLocalClusterAndValidate(dag, console, persister);
+ }
+
+ @Test
+ public void testPersistStreamOnSingleSinkWithFilteringContainerLocal()
throws ClassNotFoundException, IOException, InterruptedException
+ {
+ LogicalPlan dag = new LogicalPlan();
+ AscendingNumbersOperator ascend = dag.addOperator("ascend", new
AscendingNumbersOperator());
+ PassThruOperatorWithCodec passThru = dag.addOperator("PassThrough", new
PassThruOperatorWithCodec(2));
+ PassThruOperatorWithCodec passThru2 = dag.addOperator("Multiples_of_3",
new PassThruOperatorWithCodec(3));
+
+ final TestRecieverOperator console = dag.addOperator("console", new
TestRecieverOperator());
+ final TestRecieverOperator console1 = dag.addOperator("console1", new
TestRecieverOperator());
+
+ TestPersistanceOperator persister = new TestPersistanceOperator();
+ StreamMeta s = dag.addStream("Stream1", ascend.outputPort, passThru.input,
passThru2.input).setLocality(Locality.CONTAINER_LOCAL);
+ s.persistUsing("persister", persister, persister.inport);
+ dag.addStream("Stream2", passThru.output, console.inport);
+ dag.addStream("Stream3", passThru2.output, console1.inport);
+ // runLocalClusterAndValidate(dag, console, persister);
+
+ persister.results.clear();
+ console.results.clear();
+ console1.results.clear();
+
+ // Validate union of results is received on persist operator
+ final StramLocalCluster lc = new StramLocalCluster(dag);
+
+ new Thread("LocalClusterController")
+ {
+ @Override
+ public void run()
+ {
+ long startTms = System.currentTimeMillis();
+ long timeout = 1000000L;
+ try {
+ while (System.currentTimeMillis() - startTms < timeout) {
+ if ((console.results.size() < 6) || (console.results.size() < 6)) {
+ Thread.sleep(10);
+ } else {
+ break;
+ }
+ }
+ } catch (Exception ex) {
+ DTThrowable.rethrow(ex);
+ } finally {
+ lc.shutdown();
+ }
+ }
+
+ }.start();
+
+ lc.run();
+ try {
+ Integer[] expectedResult = { 0, 2, 3, 4, 6, 8, 9, 10, 12 };
+ for (int i = 0; i < expectedResult.length; i++) {
+ logger.debug(persister.results.get(i) + " " + expectedResult[i]);
+ assertEquals("Mismatch observed for tuple ", expectedResult[i],
persister.results.get(i));
+ }
+ } finally {
+
+ persister.results.clear();
+ console.results.clear();
+ console1.results.clear();
+ }
+ }
+
+ @Test
+ public void testPersistStreamOperatorGeneratesUnionOfAllSinksOutput() throws
ClassNotFoundException, IOException
+ {
+ LogicalPlan dag = new LogicalPlan();
+ AscendingNumbersOperator ascend = dag.addOperator("ascend", new
AscendingNumbersOperator());
+ PassThruOperatorWithCodec passThru1 = dag.addOperator("PassThrough1", new
PassThruOperatorWithCodec(2));
+ PassThruOperatorWithCodec passThru2 = dag.addOperator("PassThrough2", new
PassThruOperatorWithCodec(3));
+
+ final TestRecieverOperator console = dag.addOperator("console", new
TestRecieverOperator());
+ final TestRecieverOperator console1 = dag.addOperator("console1", new
TestRecieverOperator());
+
+ TestPersistanceOperator persister = new TestPersistanceOperator();
+ StreamMeta s = dag.addStream("Stream1", ascend.outputPort,
passThru1.input, passThru2.input);
+ s.persistUsing("persister", persister, persister.inport);
+
+ dag.addStream("Stream2", passThru1.output, console.inport);
+ dag.addStream("Stream3", passThru2.output, console1.inport);
+
+ persister.results.clear();
+ console.results.clear();
+ console1.results.clear();
+
+ // Validate union of results is received on persist operator
+ final StramLocalCluster lc = new StramLocalCluster(dag);
+
+ new Thread("LocalClusterController")
+ {
+ @Override
+ public void run()
+ {
+ long startTms = System.currentTimeMillis();
+ long timeout = 10000L;
+ try {
+ while (System.currentTimeMillis() - startTms < timeout) {
+ if ((console.results.size() < 6) || (console.results.size() < 6)) {
+ Thread.sleep(10);
+ } else {
+ break;
+ }
+ }
+ } catch (Exception ex) {
+ DTThrowable.rethrow(ex);
+ } finally {
+ lc.shutdown();
+ }
+ }
+
+ }.start();
+
+ lc.run();
+ try {
+ Integer[] expectedResult = { 0, 2, 3, 4, 6, 8, 9, 10, 12 };
+ for (int i = 0; i < expectedResult.length; i++) {
+ logger.debug(persister.results.get(i) + " " + expectedResult[i]);
+ assertEquals("Mismatch observed for tuple ", expectedResult[i],
persister.results.get(i));
+ }
+ } finally {
+
+ persister.results.clear();
+ console.results.clear();
+ console1.results.clear();
+ }
+ }
+
+ public static class TestPartitionCodec extends DefaultKryoStreamCodec
+ {
+
+ public TestPartitionCodec()
+ {
+ super();
+ }
+
+ @Override
+ public int getPartition(Object o)
+ {
+ return (int) o;// & 0x03;
+ }
+
+ }
+
+ public static class PartitionedTestOperatorWithFiltering extends
BaseOperator implements Partitioner<PassThruOperatorWithCodec>
+ {
+
+ public PartitionedTestOperatorWithFiltering()
+ {
+ }
+
+ public final transient DefaultInputPort<Object> input = new
DefaultInputPort<Object>()
+ {
+ @Override
+ public void process(Object tuple)
+ {
+ output.emit(tuple);
+ }
+ };
+
+ public final transient DefaultOutputPort<Object> output = new
DefaultOutputPort<Object>();
+
+ @Override
+ public Collection definePartitions(Collection partitions,
PartitioningContext context)
+ {
+ Collection<Partition> newPartitions = new ArrayList<Partition>();
+
+ int partitionMask = 0x03;
+
+ // No partitioning done so far..
+ // Single partition again, but with only even numbers ok?
+ // First partition
+ PassThruOperatorWithCodec newInstance = new PassThruOperatorWithCodec();
+ Partition partition = new
DefaultPartition<PassThruOperatorWithCodec>(newInstance);
+ PartitionKeys value = new PartitionKeys(partitionMask,
Sets.newHashSet(0));
+ partition.getPartitionKeys().put(input, value);
+ newPartitions.add(partition);
+
+ // Second partition
+ newInstance = new PassThruOperatorWithCodec();
+ partition = new DefaultPartition<PassThruOperatorWithCodec>(newInstance);
+ value = new PartitionKeys(partitionMask, Sets.newHashSet(1));
+ partition.getPartitionKeys().put(input, value);
+
+ newPartitions.add(partition);
+
+ return newPartitions;
+ }
+
+ @Override
+ public void partitioned(Map partitions)
+ {
+ // TODO Auto-generated method stub
+ System.out.println("Dynamic partitioning done....");
+ }
+ }
+
+ @Test
+ public void testPersistStreamOperatorMultiplePhysicalOperatorsForSink()
throws ClassNotFoundException, IOException
+ {
+ LogicalPlan dag = new LogicalPlan();
+ AscendingNumbersOperator ascend = dag.addOperator("ascend", new
AscendingNumbersOperator());
+ PartitionedTestOperatorWithFiltering passThru =
dag.addOperator("partition", new PartitionedTestOperatorWithFiltering());
+ final TestRecieverOperator console = dag.addOperator("console", new
TestRecieverOperator());
+ final TestPersistanceOperator console1 = new TestPersistanceOperator();
+ StreamMeta s = dag.addStream("Stream1", ascend.outputPort, passThru.input);
+ dag.setInputPortAttribute(passThru.input, PortContext.STREAM_CODEC, new
TestPartitionCodec());
+ s.persistUsing("persister", console1, console1.inport);
+ dag.addStream("Stream2", passThru.output, console.inport);
+
+ final StramLocalCluster lc = new StramLocalCluster(dag);
+
+ new Thread("LocalClusterController")
+ {
+ @Override
+ public void run()
+ {
+ long startTms = System.currentTimeMillis();
+ long timeout = 100000L;
+ try {
+ while (System.currentTimeMillis() - startTms < timeout) {
+ if ((console.results.size() < 6) || (console1.results.size() < 6))
{
+ Thread.sleep(10);
+ } else {
+ break;
+ }
+ }
+ } catch (Exception ex) {
+ DTThrowable.rethrow(ex);
+ } finally {
+ lc.shutdown();
+ }
+ }
+
+ }.start();
+
+ lc.run();
+
+ try {
+ Integer[] expectedResult = { 0, 1, 4, 5, 8, 9, 12, 13, 16 };
+
+ for (int i = 0; i < expectedResult.length; i++) {
+ logger.debug(console1.results.get(i) + " " + expectedResult[i]);
+ assertEquals("Mismatch observed for tuple ", expectedResult[i],
console1.results.get(i));
+ }
+ } finally {
+ console1.results.clear();
+ console.results.clear();
+ }
+ }
+
+ @Test
+ public void testPartitionedPersistOperator() throws ClassNotFoundException,
IOException
+ {
+ LogicalPlan dag = new LogicalPlan();
+ AscendingNumbersOperator ascend = dag.addOperator("ascend", new
AscendingNumbersOperator());
+ PartitionedTestOperatorWithFiltering passThru =
dag.addOperator("partition", new PartitionedTestOperatorWithFiltering());
+ final TestRecieverOperator console = dag.addOperator("console", new
TestRecieverOperator());
+ final PartitionedTestPersistanceOperator console1 = new
PartitionedTestPersistanceOperator();
+ StreamMeta s = dag.addStream("Stream1", ascend.outputPort, passThru.input);
+ dag.setInputPortAttribute(passThru.input, PortContext.STREAM_CODEC, new
TestPartitionCodec());
+ s.persistUsing("persister", console1, console1.inport);
+ dag.setInputPortAttribute(console1.inport, PortContext.STREAM_CODEC, new
TestPartitionCodec());
+ dag.addStream("Stream2", passThru.output, console.inport);
+
+ final StramLocalCluster lc = new StramLocalCluster(dag);
+
+ new Thread("LocalClusterController")
+ {
+ @Override
+ public void run()
+ {
+ long startTms = System.currentTimeMillis();
+ long timeout = 100000L;
+ try {
+ while (System.currentTimeMillis() - startTms < timeout) {
+ if (console1.results.size() < 6) {
+ Thread.sleep(10);
+ } else {
+ break;
+ }
+ }
+ } catch (Exception ex) {
+ DTThrowable.rethrow(ex);
+ } finally {
+ lc.shutdown();
+ }
+ }
+
+ }.start();
+
+ lc.run();
+
+ try {
+ // Values as per persist operator's partition keys should be picked up
+ Integer[] expectedResult = { 0, 4, 8, 12, 16, 20 };
+
+ for (int i = 0; i < expectedResult.length; i++) {
+ logger.debug(console1.results.get(i) + " " + expectedResult[i]);
+ assertEquals("Mismatch observed for tuple ", expectedResult[i],
console1.results.get(i));
+ }
+ } finally {
+ console1.results.clear();
+ console.results.clear();
+ }
+ }
+
+ @Rule
+ public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
+
+ @Test
+ public void testDynamicPartitioning() throws ClassNotFoundException,
IOException
+ {
+ LogicalPlan dag = new LogicalPlan();
+
+ dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH,
testMeta.dir);
+
+ AscendingNumbersOperator ascend = dag.addOperator("ascend", new
AscendingNumbersOperator());
+
+ final TestRecieverOperator console = dag.addOperator("console", new
TestRecieverOperator());
+ dag.setAttribute(console, Context.OperatorContext.PARTITIONER, new
StatelessPartitioner<TestRecieverOperator>(2));
+ dag.setAttribute(console, Context.OperatorContext.STATS_LISTENERS,
Lists.newArrayList((StatsListener) new PartitioningTest.PartitionLoadWatch()));
+
+ final PartitionedTestPersistanceOperator console1 = new
PartitionedTestPersistanceOperator();
+
+ StreamMeta s = dag.addStream("Stream1", ascend.outputPort, console.inport);
+ dag.setInputPortAttribute(console.inport, PortContext.STREAM_CODEC, new
TestPartitionCodec());
+ s.persistUsing("persister", console1, console1.inport);
+
+ dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
+ StramTestSupport.MemoryStorageAgent msa = new
StramTestSupport.MemoryStorageAgent();
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, msa);
+
+ StreamingContainerManager dnm = new StreamingContainerManager(dag);
+ PhysicalPlan plan = dnm.getPhysicalPlan();
+
+ List<PTContainer> containers = plan.getContainers();
+ Assert.assertEquals("number containers", 4, containers.size());
+
+ for (int i = 0; i < containers.size(); ++i) {
+ StreamingContainerManagerTest.assignContainer(dnm, "container" + (i +
1));
+ }
+
+ LogicalPlan.OperatorMeta passThruMeta = dag.getMeta(console);
+
+ List<PTOperator> ptos = plan.getOperators(passThruMeta);
+
+ for (PTContainer container : plan.getContainers()) {
+ for (PTOperator operator : container.getOperators()) {
+ operator.setState(PTOperator.State.ACTIVE);
+ }
+ }
+
+ LogicalPlan.StreamMeta s1 = (LogicalPlan.StreamMeta) s;
+ StreamCodec codec =
s1.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC);
+
+ assertEquals("Codec should be instance of StreamCodecWrapper", codec
instanceof StreamCodecWrapperForPersistance, true);
+ StreamCodecWrapperForPersistance wrapperCodec =
(StreamCodecWrapperForPersistance) codec;
+
+ Entry<InputPortMeta, Collection<PartitionKeys>> keys =
(Entry<InputPortMeta, Collection<PartitionKeys>>)
wrapperCodec.inputPortToPartitionMap.entrySet().iterator().next();
+ logger.debug(keys.toString());
+ assertEquals("Size of partitions should be 2", 2, keys.getValue().size());
+
+ for (PTOperator ptOperator : ptos) {
+ PartitioningTest.PartitionLoadWatch.put(ptOperator, -1);
+ plan.onStatusUpdate(ptOperator);
+ }
+
+ dnm.processEvents();
+
+ assertEquals("Input port map",
wrapperCodec.inputPortToPartitionMap.size(), 1);
+
+ keys = (Entry<InputPortMeta, Collection<PartitionKeys>>)
wrapperCodec.inputPortToPartitionMap.entrySet().iterator().next();
+ assertEquals("Size of partitions should be 1 after repartition", 1,
keys.getValue().size());
+ logger.debug(keys.toString());
+ }
+}