Repository: apex-malhar Updated Branches: refs/heads/master dd80369d4 -> 3b8135061
APEXMALHAR-2431 Create Kinesis Input operator which emits byte array as a tuple Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/3b813506 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/3b813506 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/3b813506 Branch: refs/heads/master Commit: 3b813506197ac95cff047322a20f123323c174d8 Parents: dd80369 Author: deepak-narkhede <[email protected]> Authored: Wed Apr 5 13:51:58 2017 +0530 Committer: deepak-narkhede <[email protected]> Committed: Mon Apr 17 14:20:38 2017 +0530 ---------------------------------------------------------------------- .../kinesis/KinesisByteArrayInputOperator.java | 50 ++++++++++++++++++++ .../kinesis/KinesisInputOperatorTest.java | 49 +++++++++++++++++++ 2 files changed, 99 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3b813506/contrib/src/main/java/com/datatorrent/contrib/kinesis/KinesisByteArrayInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/KinesisByteArrayInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/KinesisByteArrayInputOperator.java new file mode 100644 index 0000000..04e80b6 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/KinesisByteArrayInputOperator.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.kinesis; + +import java.nio.ByteBuffer; + +import com.amazonaws.services.kinesis.model.Record; + +/** + * Kinesis input adapter which consumes records from kinesis streams and emits in ByteArray form. + * + * @category Input + * @tags Kinesis, input, ByteArray + */ + +public class KinesisByteArrayInputOperator extends AbstractKinesisInputOperator<byte[]> +{ + /** + * Implement abstract method of AbstractKinesisInputOperator + */ + @Override + public byte[] getTuple(Record record) + { + try { + ByteBuffer bb = record.getData(); + byte[] bytes = new byte[bb.remaining()]; + bb.get(bytes); + return bytes; + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3b813506/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java index faffbda..a0eb042 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java @@ -182,6 +182,55 @@ public class KinesisInputOperatorTest extends KinesisOperatorTestBase lc.shutdown(); } + @Test + public void testKinesisByteArrayInputOperator() throws Exception + { + int totalCount = 10; + // initial the latch for this test + latch = new CountDownLatch(1); + + // Start producer + KinesisTestProducer p = new KinesisTestProducer(streamName); + p.setSendCount(totalCount); + p.setBatchSize(9); + new Thread(p).start(); + + // Create DAG for testing. + LocalMode lma = LocalMode.newInstance(); + DAG dag = lma.getDAG(); + + // Create KinesisByteArrayInputOperator and set some properties with respect to consumer. + KinesisByteArrayInputOperator node = dag.addOperator("Kinesis message consumer", KinesisByteArrayInputOperator.class); + node.setAccessKey(credentials.getCredentials().getAWSSecretKey()); + node.setSecretKey(credentials.getCredentials().getAWSAccessKeyId()); + KinesisConsumer consumer = new KinesisConsumer(); + consumer.setStreamName(streamName); + consumer.setRecordsLimit(totalCount); + node.setConsumer(consumer); + + // Create Test tuple collector + CollectorModule<byte[]> collector = dag.addOperator("TestMessageCollector", new CollectorModule<byte[]>()); + + // Connect ports + dag.addStream("Kinesis message", node.outputPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL); + + // Create local cluster + final LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(false); + + lc.runAsync(); + + // Wait 45s for consumer finish consuming all the messages + latch.await(45000, TimeUnit.MILLISECONDS); + + // Check results + Assert.assertEquals("Collections size", 1, collections.size()); + Assert.assertEquals("Tuple count", totalCount, collections.get(collector.inputPort.id).size()); + logger.debug(String.format("Number of emitted tuples: %d", collections.get(collector.inputPort.id).size())); + + lc.shutdown(); + } + @Override @After public void afterTest()
