Repository: incubator-apex-malhar Updated Branches: refs/heads/master ab800233b -> 98495ab62
MLHR-1936 Adding NiFi operators to contrib Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/2cb84942 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/2cb84942 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/2cb84942 Branch: refs/heads/master Commit: 2cb849422560e53973decc1e72285ac3bda357f9 Parents: 9e77ef7 Author: Bryan Bende <[email protected]> Authored: Mon Dec 14 17:34:59 2015 -0500 Committer: Bryan Bende <[email protected]> Committed: Tue Jan 26 16:04:32 2016 -0500 ---------------------------------------------------------------------- contrib/pom.xml | 6 + .../contrib/nifi/AbstractNiFiInputOperator.java | 212 +++++++++++++++++ .../nifi/AbstractNiFiOutputOperator.java | 187 +++++++++++++++ .../AbstractNiFiSinglePortInputOperator.java | 83 +++++++ .../contrib/nifi/NiFiDataPacket.java | 42 ++++ .../contrib/nifi/NiFiDataPacketBuilder.java | 33 +++ .../nifi/NiFiSinglePortInputOperator.java | 72 ++++++ .../nifi/NiFiSinglePortOutputOperator.java | 126 ++++++++++ .../contrib/nifi/StandardNiFiDataPacket.java | 59 +++++ .../nifi/NiFiSinglePortInputOperatorTest.java | 200 ++++++++++++++++ .../nifi/NiFiSinglePortOutputOperatorTest.java | 231 +++++++++++++++++++ .../nifi/demo/TestNiFiInputApplication.java | 63 +++++ .../nifi/demo/TestNiFiOutputApplication.java | 85 +++++++ .../contrib/nifi/mock/MockDataPacket.java | 58 +++++ .../contrib/nifi/mock/MockSiteToSiteClient.java | 102 ++++++++ .../contrib/nifi/mock/MockTransaction.java | 166 +++++++++++++ 16 files changed, 1725 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index 16e28c8..17b6008 100755 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -220,6 +220,12 @@ <dependencies> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-site-to-site-client</artifactId> + <version>0.4.1</version> + <optional>true</optional> + </dependency> + <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>janino</artifactId> <version>2.7.8</version> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java new file mode 100644 index 0000000..d0130f6 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.nifi; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.protocol.DataPacket; + +import com.datatorrent.api.Context; +import com.datatorrent.api.InputOperator; +import com.datatorrent.lib.util.WindowDataManager; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * This is the base implementation of a NiFi input operator. + * Subclasses should implement the methods which convert NiFi DataPackets to tuples and emit them. + * <p> + * Ports:<br> + * <b>Input</b>: No input port<br> + * <b>Output</b>: Can have any number of output ports<br> + * <br> + * Properties:<br> + * None<br> + * <br> + * Compile time checks:<br> + * Classes derived from this have to implement the abstract methods emitTuples(List<T> tuples) + * and createTuple(DataPacket dp)<br> + * <br> + * Run time checks:<br> + * None<br> + * <br> + * Benchmarks:<br> + * TBD<br> + * </p> + * + * @displayName Abstract NiFi Input + * @category Messaging + * @tags input operator + */ + +public abstract class AbstractNiFiInputOperator<T> implements InputOperator +{ + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiInputOperator.class); + + private transient SiteToSiteClient client; + private final SiteToSiteClient.Builder siteToSiteBuilder; + + private transient int operatorContextId; + private transient long currentWindowId; + private transient List<T> currentWindowTuples; + private transient List<T> recoveredTuples; + private final WindowDataManager windowDataManager; + + /** + * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient + * @param windowDataManager a WindowDataManager to save and load state for windows of tuples + */ + public AbstractNiFiInputOperator(final SiteToSiteClient.Builder siteToSiteBuilder, + final WindowDataManager windowDataManager) + { + this.siteToSiteBuilder = siteToSiteBuilder; + this.windowDataManager = windowDataManager; + } + + @Override + public void setup(Context.OperatorContext context) + { + this.client = siteToSiteBuilder.build(); + this.operatorContextId = context.getId(); + this.currentWindowTuples = new ArrayList<>(); + this.recoveredTuples = new ArrayList<>(); + this.windowDataManager.setup(context); + } + + @Override + public void beginWindow(long windowId) + { + currentWindowId = windowId; + + // if the current window is now less than the largest window, then we need to replay data + if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { + try { + List<T> recoveredData = (List<T>)this.windowDataManager.load(operatorContextId, windowId); + if (recoveredData == null) { + return; + } + + // if we recovered tuples then load them to be processed by next call to emitTuples() + recoveredTuples.addAll(recoveredData); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } + } + + @Override + public void emitTuples() + { + // if we have recovered tuples we must be replaying a previous window so emit them, + // clear the recovered list, and return until we have no more recovered data + if (recoveredTuples.size() > 0) { + emitTuples(recoveredTuples); + recoveredTuples.clear(); + return; + } + + // no recovered data so start a transaction and pull new data + try { + final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); + if (transaction == null) { + LOGGER.warn("A transaction could not be created, returning..."); + return; + } + + DataPacket dataPacket = transaction.receive(); + if (dataPacket == null) { + transaction.confirm(); + transaction.complete(); + LOGGER.debug("No data available to pull, returning and will try again..."); + return; + } + + // read all of the available data packets and convert to the given type + final List<T> tuples = new ArrayList<>(); + do { + tuples.add(createTuple(dataPacket)); + dataPacket = transaction.receive(); + } while (dataPacket != null); + + // confirm all of the expected data was received by comparing check-sums, does not complete the transaction + transaction.confirm(); + + // ensure we have the data saved before proceeding in case anything goes wrong + currentWindowTuples.addAll(tuples); + windowDataManager.save(currentWindowTuples, operatorContextId, currentWindowId); + + // we now have the data saved so we can complete the transaction + transaction.complete(); + + // delegate to sub-classes to emit the tuples + emitTuples(tuples); + + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } + + /** + * Provides mechanism for converting a DataPacket to the given type. + * + * @param dataPacket a DataPacket from the NiFi Site-To-Site client. + * @return the given type of tuple + */ + protected abstract T createTuple(final DataPacket dataPacket) throws IOException; + + /** + * Provided mechanism to emit the list of tuples for follow-on processing. + * + * @param tuples a list of tuples received from NiFi. + */ + protected abstract void emitTuples(final List<T> tuples); + + @Override + public void endWindow() + { + // save the final state of the window and clear the current window list + try { + windowDataManager.save(currentWindowTuples, operatorContextId, currentWindowId); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + currentWindowTuples.clear(); + } + + @Override + public void teardown() + { + LOGGER.debug("Tearing down operator..."); + windowDataManager.teardown(); + try { + client.close(); + } catch (IOException e) { + throw new RuntimeException("Error closing SiteToSiteClient", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java new file mode 100644 index 0000000..e4e61fb --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.nifi; + +import java.io.IOException; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; + +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.WindowDataManager; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * This is the base implementation of a NiFi output operator. + * A concrete operator should be created from this skeleton implementation. + * <p> + * <br> + * Ports:<br> + * <b>Input</b>: Can have any number of input ports<br> + * <b>Output</b>: no output port<br> + * <br> + * Properties:<br> + * None<br> + * <br> + * Compile time checks:<br> + * None<br> + * <br> + * Run time checks:<br> + * None<br> + * <br> + * Benchmarks:<br> + * TBD<br> + * <br> + * </p> + * + * @displayName Abstract NiFi Output + * @category Messaging + * @tags output operator + * + */ +public abstract class AbstractNiFiOutputOperator<T> extends BaseOperator +{ + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiOutputOperator.class); + + protected final SiteToSiteClient.Builder siteToSiteBuilder; + protected final NiFiDataPacketBuilder<T> dataPacketBuilder; + protected final WindowDataManager windowDataManager; + + protected transient SiteToSiteClient client; + + private transient int operatorContextId; + private transient long currentWindowId; + private transient long largestRecoveryWindowId; + protected transient boolean skipProcessingTuple = false; + + /** + * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient + * @param dataPacketBuilder a builder to produce NiFiDataPackets from incoming data + * @param windowDataManager a WindowDataManager to save and load state for windows of tuples + */ + public AbstractNiFiOutputOperator(final SiteToSiteClient.Builder siteToSiteBuilder, + final NiFiDataPacketBuilder<T> dataPacketBuilder, final WindowDataManager windowDataManager) + { + this.siteToSiteBuilder = siteToSiteBuilder; + this.dataPacketBuilder = dataPacketBuilder; + this.windowDataManager = windowDataManager; + } + + @Override + public void setup(Context.OperatorContext context) + { + this.client = siteToSiteBuilder.build(); + this.operatorContextId = context.getId(); + this.windowDataManager.setup(context); + } + + @Override + public void beginWindow(long windowId) + { + currentWindowId = windowId; + largestRecoveryWindowId = windowDataManager.getLargestRecoveryWindow(); + + // if processing a window we've already seen, don't resend the tuples + if (currentWindowId <= largestRecoveryWindowId) { + skipProcessingTuple = true; + } else { + skipProcessingTuple = false; + } + } + + @Override + public void endWindow() + { + // if replaying then nothing to do + if (currentWindowId <= largestRecoveryWindowId) { + return; + } + + // if processing a new window then give sub-classes a chance to take action + endNewWindow(); + + // mark that we processed the window + try { + windowDataManager.save("processedWindow", operatorContextId, currentWindowId); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } + + /** + * Called in endWindow() to give sub-classes a chance to take action when processing a new window. + * + * If the current window is <= the largest recovery window, this method will never be called. + */ + protected abstract void endNewWindow(); + + + @Override + public void teardown() + { + LOGGER.debug("Tearing down operator..."); + windowDataManager.teardown(); + try { + client.close(); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } + + /** + * Send the given batch of tuples to NiFi in a transaction, using the provided builder to + * first convert each tuple into a NiFiDataPacket. + * + * @param tuples a list of tuples to process + */ + protected void processTuples(List<T> tuples) + { + if (tuples == null || tuples.size() == 0) { + return; + } + + // create a transaction and send the data packets + try { + final Transaction transaction = client.createTransaction(TransferDirection.SEND); + if (transaction == null) { + throw new IllegalStateException("Unable to create a NiFi Transaction to send data"); + } + + // convert each tuple to a NiFiDataPacket using the provided builder + for (T tuple : tuples) { + NiFiDataPacket dp = dataPacketBuilder.createNiFiDataPacket(tuple); + transaction.send(dp.getContent(), dp.getAttributes()); + } + + transaction.confirm(); + transaction.complete(); + } catch (IOException ioe) { + DTThrowable.rethrow(ioe); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiSinglePortInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiSinglePortInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiSinglePortInputOperator.java new file mode 100644 index 0000000..d874be0 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiSinglePortInputOperator.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.nifi; + +import java.util.List; + +import org.apache.nifi.remote.client.SiteToSiteClient; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.util.WindowDataManager; + +/** + * This is the base implementation of a NiFi input operator with a single output port. + * Subclasses should implement the methods which convert NiFi DataPackets to tuples. + * <p> + * <br> + * Ports:<br> + * <b>Input</b>: No input port<br> + * <b>Output</b>: Have only one output port<br> + * <br> + * Properties:<br> + * None<br> + * <br> + * Compile time checks:<br> + * Class derived from this has to implement the abstract method createTuple(DataPacket dp) <br> + * <br> + * Run time checks:<br> + * None<br> + * <br> + * Benchmarks:<br> + * TBD<br> + * <br> + * </p> + * + * @displayName Abstract NiFi Single Port Input + * @category Messaging + * @tags input operator + * + */ +public abstract class AbstractNiFiSinglePortInputOperator<T> extends AbstractNiFiInputOperator<T> +{ + + /** + * This is the output port on which tuples extracted from NiFi data packets are emitted. + */ + public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<>(); + + /** + * + * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient + * @param windowDataManager a WindowDataManager to save and load state for windows of tuples + */ + public AbstractNiFiSinglePortInputOperator(final SiteToSiteClient.Builder siteToSiteBuilder, + final WindowDataManager windowDataManager) + { + super(siteToSiteBuilder, windowDataManager); + } + + @Override + protected void emitTuples(final List<T> tuples) + { + for (T tuple : tuples) { + outputPort.emit(tuple); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacket.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacket.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacket.java new file mode 100644 index 0000000..9c66056 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacket.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.nifi; + +import java.util.Map; + +/** + * <p> + * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both + * a FlowFile's content and its attributes so that they can be processed by Apex. + * </p> + */ +public interface NiFiDataPacket +{ + + /** + * @return the contents of a NiFi FlowFile + */ + byte[] getContent(); + + /** + * @return a Map of attributes that are associated with the NiFi FlowFile + */ + Map<String, String> getAttributes(); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacketBuilder.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacketBuilder.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacketBuilder.java new file mode 100644 index 0000000..4b71792 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacketBuilder.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.nifi; + +import java.io.Serializable; + +/** + * Creates a NiFiDataPacket from an incoming instance of the given type. + * + * @param <T> the type that a NiFiDataPacket is being created from + */ +public interface NiFiDataPacketBuilder<T> extends Serializable +{ + + NiFiDataPacket createNiFiDataPacket(T t); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperator.java new file mode 100644 index 0000000..f80386d --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperator.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.nifi; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.stream.io.StreamUtils; + +import com.datatorrent.lib.util.WindowDataManager; + +/** + * Input adapter operator which consumes data from NiFi and produces NiFiDataPackets + * where each NiFiDataPacket contains a byte array of content and a Map of attributes. + * + * @displayName NiFi Input Operator + * @category Messaging + * @tags input operator + * + */ +public class NiFiSinglePortInputOperator extends AbstractNiFiSinglePortInputOperator<NiFiDataPacket> +{ + + // required by Kyro serialization + private NiFiSinglePortInputOperator() + { + super(null, null); + } + + /** + * + * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient + * @param windowDataManager a WindowDataManager to save and load state for windows of tuples + */ + public NiFiSinglePortInputOperator(final SiteToSiteClient.Builder siteToSiteBuilder, + final WindowDataManager windowDataManager) + { + super(siteToSiteBuilder, windowDataManager); + } + + @Override + protected NiFiDataPacket createTuple(final DataPacket dataPacket) throws IOException + { + // read the data into a byte array and wrap it with the attributes into a NiFiDataPacket + final InputStream inStream = dataPacket.getData(); + final byte[] data = new byte[(int)dataPacket.getSize()]; + StreamUtils.fillBuffer(inStream, data); + + final Map<String, String> attributes = dataPacket.getAttributes(); + return new StandardNiFiDataPacket(data, attributes); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperator.java new file mode 100644 index 0000000..2692034 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperator.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.nifi; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.remote.client.SiteToSiteClient; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.lib.util.WindowDataManager; + +/** + * NiFi output adapter operator with a single input port. Clients should provide a NiFiDataPacketBuilder implementation + * for converting incoming tuples to NiFiDataPackets. + * <p> + * <br> + * Ports:<br> + * <b>Input</b>: Have only one input port<br> + * <b>Output</b>: No output port<br> + * <br> + * Properties:<br> + * None<br> + * <br> + * Compile time checks:<br> + * None<br> + * <br> + * Run time checks:<br> + * None<br> + * <br> + * Benchmarks:<br> + * TBD<br> + * <br> + * </p> + * + * @displayName NiFi Single Port Output + * @category Messaging + * @tags output operator + * + */ +public class NiFiSinglePortOutputOperator<T> extends AbstractNiFiOutputOperator<T> +{ + + public final transient BufferingInputPort inputPort; + + // required by Kyro serialization + private NiFiSinglePortOutputOperator() + { + this(null, null, null, 0); + } + + /** + * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient + * @param dataPacketBuilder a builder to produce NiFiDataPackets from incoming data + * @param windowDataManager a WindowDataManager to save and load state for windows of tuples + * @param batchSize the maximum number of tuples to send to NiFi in a single transaction + */ + public NiFiSinglePortOutputOperator( + final SiteToSiteClient.Builder siteToSiteBuilder, + final NiFiDataPacketBuilder<T> dataPacketBuilder, + final WindowDataManager windowDataManager, + final int batchSize) + { + super(siteToSiteBuilder, dataPacketBuilder, windowDataManager); + this.inputPort = new BufferingInputPort(batchSize); + } + + @Override + protected void endNewWindow() + { + // flush any tuples that may have been buffered between the last flush and endWindow() + inputPort.flush(); + } + + /** + * An InputPort that accumulates tuples up to the provided batch size before flushing. + */ + public class BufferingInputPort extends DefaultInputPort<T> + { + + private final int batchSize; + private final List<T> tuples; + + public BufferingInputPort(final int batchSize) + { + this.tuples = new ArrayList<>(); + this.batchSize = batchSize; + } + + @Override + public void process(T tuple) + { + if (!skipProcessingTuple) { + tuples.add(tuple); + + if (tuples.size() >= batchSize) { + flush(); + } + } + } + + public void flush() + { + processTuples(tuples); + tuples.clear(); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/StandardNiFiDataPacket.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/StandardNiFiDataPacket.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/StandardNiFiDataPacket.java new file mode 100644 index 0000000..6821adf --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/StandardNiFiDataPacket.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.nifi; + +import java.io.Serializable; +import java.util.Map; + +/** + * An implementation of NiFiDataPacket. + */ +public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable +{ + private static final long serialVersionUID = 6364005260220243322L; + + private final byte[] content; + private final Map<String, String> attributes; + + // required by Kyro serialization + public StandardNiFiDataPacket() + { + this.content = null; + this.attributes = null; + } + + public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes) + { + this.content = content; + this.attributes = attributes; + } + + @Override + public byte[] getContent() + { + return content; + } + + @Override + public Map<String, String> getAttributes() + { + return attributes; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java new file mode 100644 index 0000000..a25497b --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.nifi; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.jetbrains.annotations.NotNull; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.util.file.FileUtils; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.contrib.nifi.mock.MockDataPacket; +import com.datatorrent.contrib.nifi.mock.MockSiteToSiteClient; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.WindowDataManager; + +public class NiFiSinglePortInputOperatorTest +{ + + private MockSiteToSiteClient.Builder builder; + private CollectorTestSink<Object> sink; + private Context.OperatorContext context; + private WindowDataManager windowDataManager; + private NiFiSinglePortInputOperator operator; + + @Before + public void setup() throws IOException + { + final String windowDataDir = "target/" + this.getClass().getSimpleName(); + final File windowDataDirFile = new File(windowDataDir); + if (windowDataDirFile.exists()) { + FileUtils.deleteFile(windowDataDirFile, true); + } + + Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_PATH, windowDataDir); + + context = new OperatorContextTestHelper.TestIdOperatorContext(12345, attributeMap); + + sink = new CollectorTestSink<>(); + builder = new MockSiteToSiteClient.Builder(); + windowDataManager = new WindowDataManager.FSWindowDataManager(); + + operator = new NiFiSinglePortInputOperator(builder, windowDataManager); + operator.outputPort.setSink(sink); + } + + @After + public void teardown() { + if (operator != null) { + operator.teardown(); + } + } + + @Test + public void testSimpleInput() throws IOException + { + // create some mock packets and queue them in the builder before running the operator + final List<DataPacket> dataPackets = getDataPackets(4); + builder.queue(dataPackets); + + operator.setup(context); + operator.beginWindow(1); + operator.emitTuples(); + operator.endWindow(); + + Assert.assertEquals("Size of collected tuples should equal size of mock data packets", + dataPackets.size(), sink.collectedTuples.size()); + + operator.beginWindow(2); + operator.emitTuples(); + operator.endWindow(); + + Assert.assertEquals("Size of collected tuples should still equal size of mock data packets", + dataPackets.size(), sink.collectedTuples.size()); + + // verify that the collector sink got all the expected content + List<String> expectedContents = Arrays.asList("content1", "content2", "content3", "content4"); + verifyContents(expectedContents, sink.collectedTuples); + + // reinitialize the data manager so it picks up the saved data + windowDataManager.setup(context); + + // verify that all the data packets were saved for window #1 + List<StandardNiFiDataPacket> windowData = (List<StandardNiFiDataPacket>) windowDataManager.load(context.getId(), 1); + Assert.assertNotNull("Should have recovered data", windowData); + Assert.assertEquals("Size of recovered data should equal size of mock data packets", + dataPackets.size(), windowData.size()); + } + + @Test + public void testRecoveryAndIdempotency() + { + // create some mock packets and queue them in the builder before running the operator + final List<DataPacket> dataPackets = getDataPackets(4); + builder.queue(dataPackets); + + operator.setup(context); + operator.beginWindow(1); + operator.emitTuples(); + operator.endWindow(); + + Assert.assertEquals("Size of collected tuples should equal size of mock data packets", + dataPackets.size(), sink.collectedTuples.size()); + + // simulate failure and then re-deployment of operator + + sink.collectedTuples.clear(); + Assert.assertEquals("Should not have collected tuples", 0, sink.collectedTuples.size()); + + operator.setup(context); + operator.beginWindow(1); + operator.emitTuples(); + operator.endWindow(); + + Assert.assertEquals("Size of collected tuples should equal size of mock data packets", + dataPackets.size(), sink.collectedTuples.size()); + } + + @NotNull + private List<DataPacket> getDataPackets(int size) + { + List<DataPacket> dataPackets = new ArrayList<>(); + + for (int i=1; i <= size; i++) { + dataPackets.add(getDataPacket(String.valueOf(i))); + } + return dataPackets; + } + + @NotNull + private DataPacket getDataPacket(final String id) + { + Map<String, String> attrs = new HashMap<>(); + attrs.put("keyA", "valA"); + attrs.put("keyB", "valB"); + attrs.put("key" + id, "val" + id); + + byte[] content = ("content" + id).getBytes(StandardCharsets.UTF_8); + ByteArrayInputStream in = new ByteArrayInputStream(content); + + return new MockDataPacket(attrs, in, content.length); + } + + private void verifyContents(List<String> expectedContents, List<Object> tuples) + { + for (String expectedContent : expectedContents) { + boolean found = false; + + for (Object obj : tuples) { + if (obj instanceof NiFiDataPacket) { + NiFiDataPacket dp = (NiFiDataPacket)obj; + Assert.assertEquals(3, dp.getAttributes().size()); + + String content = new String(dp.getContent(), StandardCharsets.UTF_8); + if (content.equals(expectedContent)) { + found = true; + break; + } + } + } + + Assert.assertTrue(found); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java new file mode 100644 index 0000000..5b58ae0 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.nifi; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.util.file.FileUtils; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.contrib.nifi.mock.MockSiteToSiteClient; +import com.datatorrent.contrib.nifi.mock.MockTransaction; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.WindowDataManager; + +public class NiFiSinglePortOutputOperatorTest +{ + + private Context.OperatorContext context; + private WindowDataManager windowDataManager; + private MockSiteToSiteClient.Builder stsBuilder; + private NiFiDataPacketBuilder<String> dpBuilder; + private NiFiSinglePortOutputOperator<String> operator; + + @Before + public void setup() throws IOException + { + final String windowDataDir = "target/" + this.getClass().getSimpleName(); + final File windowDataDirFile = new File(windowDataDir); + if (windowDataDirFile.exists()) { + FileUtils.deleteFile(windowDataDirFile, true); + } + + Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_PATH, windowDataDir); + + context = new OperatorContextTestHelper.TestIdOperatorContext(12345, attributeMap); + + windowDataManager = new WindowDataManager.FSWindowDataManager(); + + stsBuilder = new MockSiteToSiteClient.Builder(); + dpBuilder = new StringNiFiDataPacketBuilder(); + operator = new NiFiSinglePortOutputOperator(stsBuilder, dpBuilder, windowDataManager, 1); + } + + @Test + public void testTransactionPerTuple() throws IOException + { + operator.setup(context); + + // get the mock client which will capture each transactions + final MockSiteToSiteClient mockClient = (MockSiteToSiteClient)operator.client; + + final String tuple1 = "tuple1"; + final String tuple2 = "tuple2"; + final String tuple3 = "tuple3"; + + operator.beginWindow(1); + + operator.inputPort.process(tuple1); + Assert.assertEquals(1, mockClient.getMockTransactions().size()); + + operator.inputPort.process(tuple2); + Assert.assertEquals(2, mockClient.getMockTransactions().size()); + + operator.inputPort.process(tuple3); + Assert.assertEquals(3, mockClient.getMockTransactions().size()); + + operator.endNewWindow(); + Assert.assertEquals(3, mockClient.getMockTransactions().size()); + + // verify we sent the correct content + List<String> expectedContents = Arrays.asList(tuple1, tuple2, tuple3); + List<MockTransaction> transactions = mockClient.getMockTransactions(); + + verifyTransactions(expectedContents, transactions); + } + + @Test + public void testBatchSize() throws IOException + { + final int batchSize = 3; + operator = new NiFiSinglePortOutputOperator(stsBuilder, dpBuilder, windowDataManager, batchSize); + operator.setup(context); + + // get the mock client which will capture each transactions + final MockSiteToSiteClient mockClient = (MockSiteToSiteClient)operator.client; + + final String tuple1 = "tuple1"; + final String tuple2 = "tuple2"; + final String tuple3 = "tuple3"; + final String tuple4 = "tuple4"; + final String tuple5 = "tuple5"; + + operator.beginWindow(1); + + operator.inputPort.process(tuple1); + Assert.assertEquals(0, mockClient.getMockTransactions().size()); + + operator.inputPort.process(tuple2); + Assert.assertEquals(0, mockClient.getMockTransactions().size()); + + // should cause the port to flush and create a transaction + operator.inputPort.process(tuple3); + Assert.assertEquals(1, mockClient.getMockTransactions().size()); + + operator.inputPort.process(tuple4); + Assert.assertEquals(1, mockClient.getMockTransactions().size()); + + operator.inputPort.process(tuple5); + Assert.assertEquals(1, mockClient.getMockTransactions().size()); + + // should flush tuples 4 and 5 and cause a new transaction + operator.endNewWindow(); + Assert.assertEquals(2, mockClient.getMockTransactions().size()); + + // verify we sent the correct content + List<String> expectedContents = Arrays.asList(tuple1, tuple2, tuple3, tuple4, tuple5); + List<MockTransaction> transactions = mockClient.getMockTransactions(); + + verifyTransactions(expectedContents, transactions); + } + + @Test + public void testReplay() throws IOException + { + final String tuple1 = "tuple1"; + final String tuple2 = "tuple2"; + final String tuple3 = "tuple3"; + + operator.setup(context); + operator.beginWindow(1); + operator.inputPort.process(tuple1); + operator.inputPort.process(tuple2); + operator.inputPort.process(tuple3); + operator.endWindow(); + + // get the mock client which will capture each transactions + MockSiteToSiteClient mockClient = (MockSiteToSiteClient)operator.client; + Assert.assertEquals(3, mockClient.getMockTransactions().size()); + + // simulate replaying window #1 + operator.setup(context); + operator.beginWindow(1); + operator.inputPort.process(tuple1); + operator.inputPort.process(tuple2); + operator.inputPort.process(tuple3); + operator.endWindow(); + + // should not have created any transactions on the new client + mockClient = (MockSiteToSiteClient)operator.client; + Assert.assertEquals(0, mockClient.getMockTransactions().size()); + } + + + private void verifyTransactions(List<String> expectedContents, List<MockTransaction> transactions) throws IOException + { + // convert all the data packets in the transactions to strings + final List<String> dataPacketContents = new ArrayList<>(); + + for (MockTransaction mockTransaction : transactions) + { + List<DataPacket> dps = mockTransaction.getSentDataPackets(); + Assert.assertTrue(dps.size() > 0); + + for (DataPacket dp : dps) + { + final String dpContent = IOUtils.toString(dp.getData()); + dataPacketContents.add(dpContent); + } + } + + // verify each expected piece of content is found in the data packet contents + for (String expectedContent : expectedContents) + { + boolean found = false; + for (String dataPacket : dataPacketContents) + { + if (dataPacket.equals(expectedContent)) + { + found = true; + break; + } + } + Assert.assertTrue(found); + } + } + + /** + * A builder that can create a NiFiDataPacket from a string. + */ + public static class StringNiFiDataPacketBuilder implements NiFiDataPacketBuilder<String> + { + @Override + public NiFiDataPacket createNiFiDataPacket(String s) + { + return new StandardNiFiDataPacket(s.getBytes(StandardCharsets.UTF_8), new HashMap<String, String>()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiInputApplication.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiInputApplication.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiInputApplication.java new file mode 100644 index 0000000..ebe8a0c --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiInputApplication.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.nifi.demo; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.contrib.nifi.NiFiSinglePortInputOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.util.WindowDataManager; + +/** + * A sample application that shows how to receive data to a NiFi Output Port. + */ +public class TestNiFiInputApplication implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + final SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("Apex") + .requestBatchCount(5) + .buildConfig(); + + final SiteToSiteClient.Builder builder = new SiteToSiteClient.Builder().fromConfig(clientConfig); + + final WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager(); + + NiFiSinglePortInputOperator nifi = dag.addOperator("nifi", new NiFiSinglePortInputOperator(builder, windowDataManager)); + ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); + dag.addStream("nifi_console", nifi.outputPort, console.input).setLocality(null); + } + + public static void main(String[] args) throws Exception + { + StreamingApplication app = new TestNiFiInputApplication(); + LocalMode.runApp(app, new Configuration(false), 10000); + Thread.sleep(2000); + System.exit(0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiOutputApplication.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiOutputApplication.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiOutputApplication.java new file mode 100644 index 0000000..f5399e7 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiOutputApplication.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.nifi.demo; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.contrib.nifi.NiFiDataPacket; +import com.datatorrent.contrib.nifi.NiFiDataPacketBuilder; +import com.datatorrent.contrib.nifi.NiFiSinglePortOutputOperator; +import com.datatorrent.contrib.nifi.StandardNiFiDataPacket; +import com.datatorrent.lib.testbench.RandomEventGenerator; +import com.datatorrent.lib.util.WindowDataManager; + +/** + * A sample application that shows how to send data to a NiFi Input Port. + */ +public class TestNiFiOutputApplication implements StreamingApplication +{ + + /** + * A builder that can create a NiFiDataPacket from a string. + */ + public static class StringNiFiDataPacketBuilder implements NiFiDataPacketBuilder<String> + { + @Override + public NiFiDataPacket createNiFiDataPacket(String s) + { + return new StandardNiFiDataPacket(s.getBytes(StandardCharsets.UTF_8), new HashMap<String, String>()); + } + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + final SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("Apex") + .buildConfig(); + + final int batchSize = 1; + final SiteToSiteClient.Builder builder = new SiteToSiteClient.Builder().fromConfig(clientConfig); + final NiFiDataPacketBuilder<String> dataPacketBuilder = new StringNiFiDataPacketBuilder(); + final WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager(); + + RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); + + NiFiSinglePortOutputOperator nifi = dag.addOperator("nifi", + new NiFiSinglePortOutputOperator(builder, dataPacketBuilder, windowDataManager ,batchSize)); + + dag.addStream("rand_nifi", rand.string_data, nifi.inputPort).setLocality(null); + } + + public static void main(String[] args) throws Exception + { + StreamingApplication app = new TestNiFiOutputApplication(); + LocalMode.runApp(app, new Configuration(false), 10000); + Thread.sleep(2000); + System.exit(0); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockDataPacket.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockDataPacket.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockDataPacket.java new file mode 100644 index 0000000..a57a771 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockDataPacket.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.nifi.mock; + +import java.io.InputStream; +import java.util.Map; + +import org.apache.nifi.remote.protocol.DataPacket; + +public class MockDataPacket implements DataPacket +{ + + private final Map<String, String> attributes; + private final InputStream inputStream; + private final long size; + + public MockDataPacket(Map<String, String> attributes, InputStream inputStream, long size) + { + this.attributes = attributes; + this.inputStream = inputStream; + this.size = size; + } + + @Override + public Map<String, String> getAttributes() + { + return attributes; + } + + @Override + public InputStream getData() + { + return inputStream; + } + + @Override + public long getSize() + { + return size; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockSiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockSiteToSiteClient.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockSiteToSiteClient.java new file mode 100644 index 0000000..ee8fc55 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockSiteToSiteClient.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.nifi.mock; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.apache.nifi.remote.protocol.DataPacket; + +public class MockSiteToSiteClient implements SiteToSiteClient +{ + + private final SiteToSiteClientConfig config; + private final List<DataPacket> queuedDataPackets; + private final Iterator<DataPacket> iter; + private final List<MockTransaction> transactions; + + public MockSiteToSiteClient(final MockSiteToSiteClient.Builder builder) + { + this.config = builder.buildConfig(); + this.queuedDataPackets = (builder.queuedDataPackets == null ? + new ArrayList<DataPacket>() : builder.queuedDataPackets); + this.iter = queuedDataPackets.iterator(); + this.transactions = new ArrayList<>(); + } + + @Override + public Transaction createTransaction(TransferDirection direction) throws IOException + { + MockTransaction transaction = new MockTransaction(iter); + transactions.add(transaction); + return transaction; + } + + @Override + public boolean isSecure() throws IOException + { + return false; + } + + @Override + public SiteToSiteClientConfig getConfig() + { + return config; + } + + @Override + public void close() throws IOException + { + // nothing to do + } + + public List<MockTransaction> getMockTransactions() + { + return Collections.unmodifiableList(transactions); + } + + /** + * A builder for MockSiteToSiteClients. + */ + public static class Builder extends SiteToSiteClient.Builder + { + + private List<DataPacket> queuedDataPackets; + + public MockSiteToSiteClient.Builder queue(List<DataPacket> queuedDataPackets) + { + this.queuedDataPackets = new ArrayList<>(queuedDataPackets); + return this; + } + + @Override + public SiteToSiteClient build() + { + return new MockSiteToSiteClient(this); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockTransaction.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockTransaction.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockTransaction.java new file mode 100644 index 0000000..373f62b --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockTransaction.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.nifi.mock; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.remote.Communicant; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransactionCompletion; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.stream.io.ByteArrayInputStream; + +/** + * A mock Transaction that will return the data passed in the constructor on calls to receive(), + * and will store the data packets passed to send(). + */ +public class MockTransaction implements Transaction +{ + + private Iterator<DataPacket> dataPacketIter; + + private List<DataPacket> sentDataPackets = new ArrayList<>(); + + public MockTransaction(Iterator<DataPacket> iter) + { + this.dataPacketIter = iter; + } + + @Override + public void send(DataPacket dataPacket) throws IOException + { + if (dataPacket != null) { + this.sentDataPackets.add(dataPacket); + } + } + + @Override + public void send(byte[] content, Map<String, String> attributes) throws IOException + { + this.sentDataPackets.add(new MockDataPacket(attributes, new ByteArrayInputStream(content), content.length)); + } + + public List<DataPacket> getSentDataPackets() + { + return Collections.unmodifiableList(sentDataPackets); + } + + @Override + public DataPacket receive() throws IOException + { + if (dataPacketIter != null && dataPacketIter.hasNext()) { + return dataPacketIter.next(); + } else { + return null; + } + } + + @Override + public void confirm() throws IOException + { + + } + + @Override + public TransactionCompletion complete() throws IOException + { + return new TransactionCompletion() + { + @Override + public boolean isBackoff() + { + return false; + } + + @Override + public int getDataPacketsTransferred() + { + return 0; + } + + @Override + public long getBytesTransferred() + { + return 0; + } + + @Override + public long getDuration(TimeUnit timeUnit) + { + return 0; + } + }; + } + + @Override + public void cancel(String explanation) throws IOException + { + + } + + @Override + public void error() + { + + } + + @Override + public TransactionState getState() throws IOException + { + return TransactionState.TRANSACTION_COMPLETED; + } + + @Override + public Communicant getCommunicant() + { + return new Communicant() + { + @Override + public String getUrl() + { + return null; + } + + @Override + public String getHost() + { + return null; + } + + @Override + public int getPort() + { + return 0; + } + + @Override + public String getDistinguishedName() + { + return null; + } + }; + } + +}
