Repository: bahir-flink Updated Branches: refs/heads/master 2d1225f9a -> a830077a9
[BAHIR-73][bahir-flink] flink-streaming-akka source connector This closes #8 Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/a830077a Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/a830077a Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/a830077a Branch: refs/heads/master Commit: a830077a9b8145d5fb0718619b7055c8d7d1b14e Parents: 2d1225f Author: Subhobrata Dey <sbc...@gmail.com> Authored: Tue Oct 25 16:21:07 2016 -0400 Committer: Robert Metzger <rmetz...@apache.org> Committed: Thu Nov 24 09:51:22 2016 +0100 ---------------------------------------------------------------------- flink-connector-akka/README.md | 37 +++ flink-connector-akka/pom.xml | 85 ++++++ .../streaming/connectors/akka/AkkaSource.java | 147 +++++++++++ .../streaming/connectors/akka/package-info.java | 21 ++ .../connectors/akka/utils/ReceiverActor.java | 110 ++++++++ .../akka/utils/SubscribeReceiver.java | 58 +++++ .../akka/utils/UnsubscribeReceiver.java | 58 +++++ .../connectors/akka/AkkaSourceTest.java | 256 +++++++++++++++++++ .../connectors/akka/utils/FeederActor.java | 99 +++++++ .../connectors/akka/utils/Message.java | 25 ++ .../src/test/resources/feeder_actor.conf | 33 +++ .../src/test/resources/log4j.properties | 27 ++ pom.xml | 1 + 13 files changed, 957 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/README.md ---------------------------------------------------------------------- diff --git a/flink-connector-akka/README.md b/flink-connector-akka/README.md new file mode 100644 index 0000000..b3b64e5 --- /dev/null +++ b/flink-connector-akka/README.md @@ -0,0 +1,37 @@ +# Flink Akka connector + +This connector provides a sink to [Akka](http://akka.io/) source actors in an ActorSystem. +To use this connector, add the following dependency to your project: + + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>flink-connector-akka_2.11</artifactId> + <version>1.0.0-SNAPSHOT</version> + </dependency> + +*Version Compatibility*: This module is compatible with Akka 2.0+. + +## Configuration + +The configurations for the Receiver Actor System in Flink Akka connector can be created using the standard typesafe `Config (com.typesafe.config.Config)` object. + +To enable acknowledgements, the custom configuration `akka.remote.auto-ack` can be used. + +The user can set any of the default configurations allowed by Akka as well as custom configurations allowed by the connector. + +A sample configuration can be defined as follows: + + String configFile = getClass().getClassLoader() + .getResource("feeder_actor.conf").getFile(); + Config config = ConfigFactory.parseFile(new File(configFile)); + +## Message Types + +There are 3 different kind of message types which the receiver Actor in flink akka connector can receive. + +- message containing `Iterable<Object>` data + +- message containing generic `Object` data + +- message containing generic `Object` data and a `Timestamp` value passed as `Tuple2<Object, Long>`. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connector-akka/pom.xml b/flink-connector-akka/pom.xml new file mode 100644 index 0000000..df82563 --- /dev/null +++ b/flink-connector-akka/pom.xml @@ -0,0 +1,85 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-flink_parent_2.11</artifactId> + <version>1.0.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>flink-connector-akka_2.11</artifactId> + <name>flink-connector-akka</name> + <url>http://bahir.apache.org/</url> + <packaging>jar</packaging> + + <properties> + <scala.binary.version>2.11</scala.binary.version> + <mockito.version>1.10.19</mockito.version> + <akka.version>2.3.15</akka.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-actor_${scala.binary.version}</artifactId> + <version>${akka.version}</version> + </dependency> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-remote_${scala.binary.version}</artifactId> + <version>${akka.version}</version> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>${mockito.version}</version> + <type>jar</type> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java ---------------------------------------------------------------------- diff --git a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java new file mode 100644 index 0000000..3925d0b --- /dev/null +++ b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.akka; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.akka.utils.ReceiverActor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; + +/** + * Implementation of {@link SourceFunction} specialized to read messages + * from Akka actors. + */ +public class AkkaSource extends RichSourceFunction<Object> + implements StoppableFunction { + + private static final Logger LOG = LoggerFactory.getLogger(AkkaSource.class); + + private static final long serialVersionUID = 1L; + + // --- Fields set by the constructor + + private final Class<?> classForActor; + + private final String actorName; + + private final String urlOfPublisher; + + private final Config configuration; + + // --- Runtime fields + private transient ActorSystem receiverActorSystem; + private transient ActorRef receiverActor; + + protected transient boolean autoAck; + + /** + * Creates {@link AkkaSource} for Streaming + * + * @param actorName Receiver Actor name + * @param urlOfPublisher tcp url of the publisher or feeder actor + */ + public AkkaSource(String actorName, + String urlOfPublisher, + Config configuration) { + super(); + this.classForActor = ReceiverActor.class; + this.actorName = actorName; + this.urlOfPublisher = urlOfPublisher; + this.configuration = configuration; + } + + @Override + public void open(Configuration parameters) throws Exception { + receiverActorSystem = createDefaultActorSystem(); + + if (configuration.hasPath("akka.remote.auto-ack") && + configuration.getString("akka.remote.auto-ack").equals("on")) { + autoAck = true; + } else { + autoAck = false; + } + } + + @Override + public void run(SourceFunction.SourceContext<Object> ctx) throws Exception { + LOG.info("Starting the Receiver actor {}", actorName); + receiverActor = receiverActorSystem.actorOf( + Props.create(classForActor, ctx, urlOfPublisher, autoAck), actorName); + + LOG.info("Started the Receiver actor {} successfully", actorName); + receiverActorSystem.awaitTermination(); + } + + @Override + public void close() { + LOG.info("Closing source"); + if (receiverActorSystem != null) { + receiverActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + receiverActorSystem.shutdown(); + } + } + + @Override + public void cancel() { + LOG.info("Cancelling akka source"); + close(); + } + + @Override + public void stop() { + LOG.info("Stopping akka source"); + close(); + } + + /** + * Creates an actor system with default configurations for Receiver actor. + * + * @return Actor System instance with default configurations + */ + private ActorSystem createDefaultActorSystem() { + String defaultActorSystemName = "receiver-actor-system"; + + Config finalConfig = getOrCreateMandatoryProperties(configuration); + + return ActorSystem.create(defaultActorSystemName, finalConfig); + } + + private Config getOrCreateMandatoryProperties(Config properties) { + if (!properties.hasPath("akka.actor.provider")) { + properties = properties.withValue("akka.actor.provider", + ConfigValueFactory.fromAnyRef("akka.remote.RemoteActorRefProvider")); + } + + if (!properties.hasPath("akka.remote.enabled-transports")) { + properties = properties.withValue("akka.remote.enabled-transports", + ConfigValueFactory.fromAnyRef(Collections.singletonList("akka.remote.netty.tcp"))); + } + return properties; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/package-info.java ---------------------------------------------------------------------- diff --git a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/package-info.java b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/package-info.java new file mode 100644 index 0000000..ecea4c4 --- /dev/null +++ b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Akka receiver for Flink Streaming. + */ +package org.apache.flink.streaming.connectors.akka; http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java ---------------------------------------------------------------------- diff --git a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java new file mode 100644 index 0000000..09913a0 --- /dev/null +++ b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.akka.utils; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.UntypedActor; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; + +import java.util.Iterator; + +/** + * Generalized receiver actor which receives messages + * from the feeder or publisher actor. + */ +public class ReceiverActor extends UntypedActor { + // --- Fields set by the constructor + private final SourceContext<Object> ctx; + + private final String urlOfPublisher; + + private final boolean autoAck; + + // --- Runtime fields + private ActorSelection remotePublisher; + + public ReceiverActor(SourceContext<Object> ctx, + String urlOfPublisher, + boolean autoAck) { + this.ctx = ctx; + this.urlOfPublisher = urlOfPublisher; + this.autoAck = autoAck; + } + + @Override + public void preStart() throws Exception { + remotePublisher = getContext().actorSelection(urlOfPublisher); + remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf()); + } + + @SuppressWarnings("unchecked") + @Override + public void onReceive(Object message) + throws Exception { + if (message instanceof Iterable) { + collect((Iterable<Object>) message); + } else if (message instanceof Tuple2) { + Tuple2<Object, Long> messageTuple = (Tuple2<Object, Long>) message; + collect(messageTuple.f0, messageTuple.f1); + } else { + collect(message); + } + + if (autoAck) { + getSender().tell("ack", getSelf()); + } + } + + /** + * To handle {@link Iterable} data + * + * @param data data received from feeder actor + */ + private void collect(Iterable<Object> data) { + Iterator<Object> iterator = data.iterator(); + while (iterator.hasNext()) { + ctx.collect(iterator.next()); + } + } + + /** + * To handle single data + * @param data data received from feeder actor + */ + private void collect(Object data) { + ctx.collect(data); + } + + /** + * To handle data with timestamp + * + * @param data data received from feeder actor + * @param timestamp timestamp received from feeder actor + */ + private void collect(Object data, long timestamp) { + ctx.collectWithTimestamp(data, timestamp); + } + + @Override + public void postStop() throws Exception { + remotePublisher.tell(new UnsubscribeReceiver(ActorRef.noSender()), + ActorRef.noSender()); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/SubscribeReceiver.java ---------------------------------------------------------------------- diff --git a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/SubscribeReceiver.java b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/SubscribeReceiver.java new file mode 100644 index 0000000..735e3e9 --- /dev/null +++ b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/SubscribeReceiver.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.akka.utils; + +import akka.actor.ActorRef; + +import java.io.Serializable; + +/** + * General interface used by Receiver Actor to subscribe + * to the publisher. + */ +public class SubscribeReceiver implements Serializable { + private static final long serialVersionUID = 1L; + private ActorRef receiverActor; + + public SubscribeReceiver(ActorRef receiverActor) { + this.receiverActor = receiverActor; + } + + public void setReceiverActor(ActorRef receiverActor) { + this.receiverActor = receiverActor; + } + + public ActorRef getReceiverActor() { + return receiverActor; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof SubscribeReceiver) { + SubscribeReceiver other = (SubscribeReceiver) obj; + return other.canEquals(this) && super.equals(other) + && receiverActor.equals(other.getReceiverActor()); + } else { + return false; + } + } + + public boolean canEquals(Object obj) { + return obj instanceof SubscribeReceiver; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/UnsubscribeReceiver.java ---------------------------------------------------------------------- diff --git a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/UnsubscribeReceiver.java b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/UnsubscribeReceiver.java new file mode 100644 index 0000000..8c6dde2 --- /dev/null +++ b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/UnsubscribeReceiver.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.akka.utils; + +import akka.actor.ActorRef; + +import java.io.Serializable; + +/** + * General interface used by Receiver Actor to un subscribe. + */ +public class UnsubscribeReceiver implements Serializable { + private static final long serialVersionUID = 1L; + private ActorRef receiverActor; + + public UnsubscribeReceiver(ActorRef receiverActor) { + this.receiverActor = receiverActor; + } + + public void setReceiverActor(ActorRef receiverActor) { + this.receiverActor = receiverActor; + } + + public ActorRef getReceiverActor() { + return receiverActor; + } + + + @Override + public boolean equals(Object obj) { + if (obj instanceof UnsubscribeReceiver) { + UnsubscribeReceiver other = (UnsubscribeReceiver) obj; + return other.canEquals(this) && super.equals(other) + && receiverActor.equals(other.getReceiverActor()); + } else { + return false; + } + } + + public boolean canEquals(Object obj) { + return obj instanceof UnsubscribeReceiver; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java new file mode 100644 index 0000000..e7114d7 --- /dev/null +++ b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.akka; + +import akka.actor.ActorSystem; +import akka.actor.Props; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.akka.utils.FeederActor; +import org.apache.flink.streaming.connectors.akka.utils.Message; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +public class AkkaSourceTest { + private AkkaSource source; + + private static final String feederActorName = "JavaFeederActor"; + private static final String receiverActorName = "receiverActor"; + private static final String urlOfFeeder = + "akka.tcp://feederActorSystem@127.0.0.1:5150/user/" + feederActorName; + private ActorSystem feederActorSystem; + + private Configuration config = new Configuration(); + private Config sourceConfiguration = ConfigFactory.empty(); + + private Thread sourceThread; + + private SourceFunction.SourceContext<Object> sourceContext; + + private volatile Exception exception; + + @Before + public void beforeTest() throws Exception { + feederActorSystem = ActorSystem.create("feederActorSystem", + getFeederActorConfig()); + + sourceContext = new DummySourceContext(); + + sourceThread = new Thread(new Runnable() { + @Override + public void run() { + try { + SourceFunction.SourceContext<Object> sourceContext = + new DummySourceContext(); + source.run(sourceContext); + } catch (Exception e) { + exception = e; + } + } + }); + } + + @After + public void afterTest() throws Exception { + feederActorSystem.shutdown(); + feederActorSystem.awaitTermination(); + + source.cancel(); + sourceThread.join(); + } + + @Test + public void testWithSingleData() throws Exception { + source = new AkkaTestSource(sourceConfiguration); + + feederActorSystem.actorOf( + Props.create(FeederActor.class, FeederActor.MessageTypes.SINGLE_DATA), + feederActorName); + + source.autoAck = false; + source.open(config); + sourceThread.start(); + + while (DummySourceContext.numElementsCollected != 1) { + Thread.sleep(5); + } + List<Object> message = DummySourceContext.message; + Assert.assertEquals(message.get(0).toString(), Message.WELCOME_MESSAGE); + } + + @Test + public void testWithIterableData() throws Exception { + source = new AkkaTestSource(sourceConfiguration); + + feederActorSystem.actorOf( + Props.create(FeederActor.class, FeederActor.MessageTypes.ITERABLE_DATA), + feederActorName); + + source.autoAck = false; + source.open(config); + sourceThread.start(); + + while (DummySourceContext.numElementsCollected != 2) { + Thread.sleep(5); + } + + List<Object> messages = DummySourceContext.message; + Assert.assertEquals(messages.get(0).toString(), Message.WELCOME_MESSAGE); + Assert.assertEquals(messages.get(1).toString(), Message.FEEDER_MESSAGE); + } + + @Test + public void testWithByteArrayData() throws Exception { + source = new AkkaTestSource(sourceConfiguration); + + feederActorSystem.actorOf( + Props.create(FeederActor.class, FeederActor.MessageTypes.BYTES_DATA), + feederActorName); + + source.autoAck = false; + source.open(config); + sourceThread.start(); + + while (DummySourceContext.numElementsCollected != 1) { + Thread.sleep(5); + } + + List<Object> message = DummySourceContext.message; + if (message.get(0) instanceof byte[]) { + byte[] data = (byte[]) message.get(0); + Assert.assertEquals(new String(data), Message.WELCOME_MESSAGE); + } + } + + @Test + public void testWithSingleDataWithTimestamp() throws Exception { + source = new AkkaTestSource(sourceConfiguration); + + feederActorSystem.actorOf( + Props.create(FeederActor.class, FeederActor.MessageTypes.SINGLE_DATA_WITH_TIMESTAMP), + feederActorName); + + source.autoAck = false; + source.open(config); + sourceThread.start(); + + while (DummySourceContext.numElementsCollected != 1) { + Thread.sleep(5); + } + + List<Object> message = DummySourceContext.message; + Assert.assertEquals(message.get(0).toString(), Message.WELCOME_MESSAGE); + } + + @Test + public void testAcksWithSingleData() throws Exception { + sourceConfiguration = sourceConfiguration.withValue("akka.remote.auto-ack", + ConfigValueFactory.fromAnyRef("on")); + source = new AkkaTestSource(sourceConfiguration); + + feederActorSystem.actorOf( + Props.create(FeederActor.class, FeederActor.MessageTypes.SINGLE_DATA), + feederActorName); + + source.open(config); + sourceThread.start(); + + while (DummySourceContext.numElementsCollected != 1) { + Thread.sleep(5); + } + + int noOfRetries = 1; + while (Message.ACK_MESSAGE == null && noOfRetries <= 5) { + Thread.sleep(5); + noOfRetries++; + } + Assert.assertEquals("ack", Message.ACK_MESSAGE); + } + + private class AkkaTestSource extends AkkaSource { + + private AkkaTestSource(Config sourceConfig) { + super(receiverActorName, urlOfFeeder, sourceConfig); + } + + @Override + public RuntimeContext getRuntimeContext() { + return Mockito.mock(StreamingRuntimeContext.class); + } + } + + private static class DummySourceContext implements SourceFunction.SourceContext<Object> { + private static final Object lock = new Object(); + + private static long numElementsCollected; + + private static List<Object> message; + + private DummySourceContext() { + numElementsCollected = 0; + message = new ArrayList<Object>(); + } + + @Override + public void collect(Object element) { + message.add(element); + numElementsCollected++; + } + + @Override + public void collectWithTimestamp(Object element, long timestamp) { + message.add(element); + numElementsCollected++; + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return lock; + } + + @Override + public void close() { + + } + } + + private Config getFeederActorConfig() { + String configFile = getClass().getClassLoader() + .getResource("feeder_actor.conf").getFile(); + Config config = ConfigFactory.parseFile(new File(configFile)); + return config; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/FeederActor.java ---------------------------------------------------------------------- diff --git a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/FeederActor.java b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/FeederActor.java new file mode 100644 index 0000000..40d4694 --- /dev/null +++ b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/FeederActor.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.akka.utils; + +import akka.actor.ActorRef; +import akka.actor.UntypedActor; +import org.apache.flink.api.java.tuple.Tuple2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class FeederActor extends UntypedActor { + + public enum MessageTypes { + SINGLE_DATA, ITERABLE_DATA, BYTES_DATA, + SINGLE_DATA_WITH_TIMESTAMP + } + + private static final Logger LOG = LoggerFactory.getLogger(FeederActor.class); + + private final MessageTypes messageType; + + public FeederActor(MessageTypes messageType) { + this.messageType = messageType; + } + + @Override + public void onReceive(Object message) { + if (message instanceof SubscribeReceiver) { + ActorRef receiver = ((SubscribeReceiver) message).getReceiverActor(); + + Object data; + switch (messageType) { + case SINGLE_DATA: + data = createSingleDataMessage(); + break; + case ITERABLE_DATA: + data = createIterableOfMessages(); + break; + case BYTES_DATA: + data = createByteMessages(); + break; + case SINGLE_DATA_WITH_TIMESTAMP: + data = createTimestampMessage(); + break; + default: + throw new RuntimeException("Message format specified is incorrect"); + } + receiver.tell(data, getSelf()); + } else if (message instanceof String) { + Message.ACK_MESSAGE = message.toString(); + } else if (message instanceof UnsubscribeReceiver) { + LOG.info("Stop actor!"); + } + } + + private Object createSingleDataMessage() { + return Message.WELCOME_MESSAGE; + } + + private List<Object> createIterableOfMessages() { + List<Object> messages = new ArrayList<Object>(); + + messages.add(Message.WELCOME_MESSAGE); + messages.add(Message.FEEDER_MESSAGE); + + return messages; + } + + private byte[] createByteMessages() { + byte[] message = Message.WELCOME_MESSAGE.getBytes(); + return message; + } + + private Tuple2<Object, Long> createTimestampMessage() { + Tuple2<Object, Long> message = new Tuple2<Object, Long>(); + message.f0 = Message.WELCOME_MESSAGE; + message.f1 = System.currentTimeMillis(); + + return message; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/Message.java ---------------------------------------------------------------------- diff --git a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/Message.java b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/Message.java new file mode 100644 index 0000000..6f70467 --- /dev/null +++ b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/Message.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.akka.utils; + +public class Message { + public static final String WELCOME_MESSAGE = "welcome receiver"; + public static final String FEEDER_MESSAGE = "this is feeder"; + + public static String ACK_MESSAGE; +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/test/resources/feeder_actor.conf ---------------------------------------------------------------------- diff --git a/flink-connector-akka/src/test/resources/feeder_actor.conf b/flink-connector-akka/src/test/resources/feeder_actor.conf new file mode 100644 index 0000000..a877aa3 --- /dev/null +++ b/flink-connector-akka/src/test/resources/feeder_actor.conf @@ -0,0 +1,33 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +akka { + loglevel = "INFO" + actor { + provider = "akka.remote.RemoteActorRefProvider" + } + remote { + enabled-transports = ["akka.remote.netty.tcp"] + netty.tcp { + hostname = 127.0.0.1 + port = 5150 + } + log-sent-messages = on + log-received-messages = on + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-connector-akka/src/test/resources/log4j.properties b/flink-connector-akka/src/test/resources/log4j.properties new file mode 100644 index 0000000..c82c2c7 --- /dev/null +++ b/flink-connector-akka/src/test/resources/log4j.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This file ensures that tests executed from the IDE show log output + +log4j.rootLogger=INFO, console + +# Log all infos in the given file +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ff6da2b..9a15ff4 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,7 @@ <module>flink-connector-flume</module> <module>flink-connector-activemq</module> <module>flink-connector-netty</module> + <module>flink-connector-akka</module> </modules> <properties>