Repository: bahir-flink
Updated Branches:
  refs/heads/master 55605cbde -> 567792b26


[BAHIR-173] Update Flink version to 1.5.1

Closes #29


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/567792b2
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/567792b2
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/567792b2

Branch: refs/heads/master
Commit: 567792b26f5b9d80cb423f5d660e3011d475057c
Parents: 55605cb
Author: Dominik.Wosinski <b...@wp.pl>
Authored: Fri Jul 20 23:47:12 2018 +0200
Committer: Luciano Resende <lrese...@apache.org>
Committed: Thu Jul 26 09:35:20 2018 -0400

----------------------------------------------------------------------
 .travis.yml                                     | 12 ++-----
 .../streaming/connectors/activemq/AMQSink.java  |  4 +--
 .../connectors/activemq/AMQSinkConfig.java      |  2 +-
 .../connectors/activemq/AMQSource.java          |  6 ++--
 .../connectors/activemq/AMQSourceConfig.java    |  2 +-
 .../connectors/activemq/AMQSinkTest.java        | 10 +++---
 .../connectors/activemq/AMQSourceTest.java      | 14 ++++----
 .../activemq/ActiveMQConnectorITCase.java       | 13 +++----
 flink-connector-akka/pom.xml                    |  7 +++-
 .../streaming/connectors/akka/AkkaSource.java   |  6 ++--
 .../connectors/akka/AkkaSourceTest.java         |  6 ++--
 .../streaming/connectors/flume/FlumeSink.java   |  5 ++-
 .../connectors/influxdb/InfluxDBSink.java       |  2 +-
 .../connectors/netty/example/NettyUtil.scala    |  3 --
 .../netty/example/StreamSqlExample.scala        |  4 +--
 .../streaming/connectors/redis/RedisSink.java   |  4 +--
 .../connectors/redis/RedisITCaseBase.java       |  4 +--
 .../operator/StreamInMemOutputHandler.java      |  4 +--
 .../siddhi/operator/StreamOutputHandler.java    |  4 +--
 .../flink/streaming/siddhi/SiddhiCEPITCase.java |  4 +--
 pom.xml                                         | 36 +-------------------
 21 files changed, 58 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index c3baf70..083e75d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -32,20 +32,12 @@ matrix:
   include:
     - jdk: oraclejdk8
       env:
-        - FLINK_VERSION="1.3.0" SCALA_VERSION="2.11"
+        - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11"
         - CACHE_NAME=JDK8_F130_A
-    - jdk: oraclejdk8
-      env:
-        - FLINK_VERSION="1.3.0" SCALA_VERSION="2.10"
-        - CACHE_NAME=JDK8_F130_B
     - jdk: openjdk8
       env:
-        - FLINK_VERSION="1.3.0" SCALA_VERSION="2.11"
+        - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11"
         - CACHE_NAME=JDK8_F130_C
-    - jdk: openjdk8
-      env:
-        - FLINK_VERSION="1.3.0" SCALA_VERSION="2.10"
-        - CACHE_NAME=JDK8_F130_D
 
 before_install:
   - ./dev/change-scala-version.sh $SCALA_VERSION

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
 
b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
index a494162..b33bffe 100644
--- 
a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
+++ 
b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
@@ -18,10 +18,10 @@
 package org.apache.flink.streaming.connectors.activemq;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.activemq.internal.AMQUtil;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -128,7 +128,7 @@ public class AMQSink<IN> extends RichSinkFunction<IN> {
      *            The incoming data
      */
     @Override
-    public void invoke(IN value) {
+    public void invoke(IN value, Context context) throws Exception {
         try {
             byte[] bytes = serializationSchema.serialize(value);
             BytesMessage message = session.createBytesMessage();

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
 
b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
index e10c3c8..51a6aac 100644
--- 
a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
+++ 
b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
@@ -17,7 +17,7 @@
 package org.apache.flink.streaming.connectors.activemq;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.util.Preconditions;
 
 /**

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
 
b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
index e64b8fd..4f2114f 100644
--- 
a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
+++ 
b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.activemq;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQSession;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
@@ -28,7 +29,6 @@ import 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import 
org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener;
 import org.apache.flink.streaming.connectors.activemq.internal.AMQUtil;
 import org.apache.flink.streaming.connectors.activemq.internal.RunningChecker;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +40,7 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Set;
 
 /**
  * Source for reading messages from an ActiveMQ queue.
@@ -191,7 +191,7 @@ public class AMQSource<OUT> extends 
MessageAcknowledgingSourceBase<OUT, String>
     }
 
     @Override
-    protected void acknowledgeIDs(long checkpointId, List<String> UIds) {
+    protected void acknowledgeIDs(long checkpointId, Set<String> UIds) {
         try {
             for (String messageId : UIds) {
                 Message unacknowledgedMessage = 
unacknowledgedMessages.get(messageId);

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
 
b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
index dd73b0e..06cb267 100644
--- 
a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
+++ 
b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
@@ -17,8 +17,8 @@
 package org.apache.flink.streaming.connectors.activemq;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.connectors.activemq.internal.RunningChecker;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.util.Preconditions;
 
 /**

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java
 
b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java
index e4d67c3..52493d7 100644
--- 
a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java
+++ 
b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java
@@ -19,9 +19,9 @@
 package org.apache.flink.streaming.connectors.activemq;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -83,7 +83,7 @@ public class AMQSinkTest {
     @Test
     public void messageSentToProducer() throws Exception {
         byte[] expectedMessage = serializationSchema.serialize("msg");
-        amqSink.invoke("msg");
+        amqSink.invoke("msg", null);
 
         verify(producer).send(message);
         verify(message).writeBytes(expectedMessage);
@@ -121,7 +121,7 @@ public class AMQSinkTest {
         when(session.createBytesMessage()).thenThrow(JMSException.class);
         amqSink.setLogFailuresOnly(true);
 
-        amqSink.invoke("msg");
+        amqSink.invoke("msg", null);
     }
 
     @SuppressWarnings("unchecked")
@@ -129,7 +129,7 @@ public class AMQSinkTest {
     public void exceptionOnSendAreThrownByDefault() throws Exception {
         when(session.createBytesMessage()).thenThrow(JMSException.class);
 
-        amqSink.invoke("msg");
+        amqSink.invoke("msg", null);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
 
b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
index 0e6dd31..9c7be72 100644
--- 
a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
+++ 
b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.activemq;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.configuration.Configuration;
@@ -27,7 +28,6 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import 
org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener;
 import org.apache.flink.streaming.connectors.activemq.internal.RunningChecker;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -161,7 +161,7 @@ public class AMQSourceTest {
     @Test
     public void acknowledgeReceivedMessage() throws Exception {
         amqSource.run(context);
-        amqSource.acknowledgeIDs(CHECKPOINT_ID, 
Collections.singletonList(MSG_ID));
+        amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singleton(MSG_ID));
 
         verify(message).acknowledge();
     }
@@ -169,7 +169,7 @@ public class AMQSourceTest {
     @Test
     public void handleUnknownIds() throws Exception {
         amqSource.run(context);
-        amqSource.acknowledgeIDs(CHECKPOINT_ID, 
Collections.singletonList("unknown-id"));
+        amqSource.acknowledgeIDs(CHECKPOINT_ID, 
Collections.singleton("unknown-id"));
 
         verify(message, never()).acknowledge();
     }
@@ -177,8 +177,8 @@ public class AMQSourceTest {
     @Test
     public void doNotAcknowledgeMessageTwice() throws Exception {
         amqSource.run(context);
-        amqSource.acknowledgeIDs(CHECKPOINT_ID, 
Collections.singletonList(MSG_ID));
-        amqSource.acknowledgeIDs(CHECKPOINT_ID, 
Collections.singletonList(MSG_ID));
+        amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singleton(MSG_ID));
+        amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singleton(MSG_ID));
 
         verify(message, times(1)).acknowledge();
     }
@@ -196,7 +196,7 @@ public class AMQSourceTest {
         doThrow(JMSException.class).when(message).acknowledge();
 
         amqSource.run(context);
-        amqSource.acknowledgeIDs(CHECKPOINT_ID, 
Collections.singletonList(MSG_ID));
+        amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singleton(MSG_ID));
     }
 
     @Test
@@ -206,7 +206,7 @@ public class AMQSourceTest {
         doThrow(JMSException.class).when(message).acknowledge();
 
         amqSource.run(context);
-        amqSource.acknowledgeIDs(CHECKPOINT_ID, 
Collections.singletonList(MSG_ID));
+        amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singleton(MSG_ID));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java
 
b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java
index 9af6dd5..3b31fc4 100644
--- 
a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java
+++ 
b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java
@@ -20,8 +20,10 @@ package org.apache.flink.streaming.connectors.activemq;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -30,7 +32,6 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.test.util.SuccessException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -60,8 +61,8 @@ public class ActiveMQConnectorITCase {
         // start also a re-usable Flink mini cluster
         Configuration flinkConfig = new Configuration();
         flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-        flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-        flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
16);
+        flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS.key(), 8);
+        flinkConfig.setInteger(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), 
16);
         
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
 
         flink = new LocalFlinkMiniCluster(flinkConfig, false);
@@ -74,7 +75,7 @@ public class ActiveMQConnectorITCase {
     public static void afterClass() {
         flinkPort = -1;
         if (flink != null) {
-            flink.shutdown();
+            flink.startInternalShutdown();
         }
     }
 
@@ -158,7 +159,7 @@ public class ActiveMQConnectorITCase {
             .addSink(new SinkFunction<String>() {
                 final HashSet<Integer> set = new HashSet<>();
                 @Override
-                public void invoke(String value) throws Exception {
+                public void invoke(String value, Context context) throws 
Exception {
                     int val = Integer.parseInt(value.split("-")[1]);
                     set.add(val);
 
@@ -181,7 +182,7 @@ public class ActiveMQConnectorITCase {
         sink.open(new Configuration());
 
         for (int i = 0; i < MESSAGES_NUM; i++) {
-            sink.invoke("amq-" + i);
+            sink.invoke("amq-" + i, null);
         }
 
         AMQSourceConfig<String> sourceConfig = new 
AMQSourceConfig.AMQSourceConfigBuilder<String>()

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-akka/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-akka/pom.xml b/flink-connector-akka/pom.xml
index 5eb261b..3dab6b8 100644
--- a/flink-connector-akka/pom.xml
+++ b/flink-connector-akka/pom.xml
@@ -35,11 +35,16 @@ under the License.
 
     <properties>
         <mockito.version>1.10.19</mockito.version>
-        <akka.version>2.3.15</akka.version>
+        <akka.version>2.4.20</akka.version>
     </properties>
 
     <dependencies>
         <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.flink</groupId>
             
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
             <version>${flink.version}</version>

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java
 
b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java
index 3925d0b..d8a6919 100644
--- 
a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java
+++ 
b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java
@@ -30,6 +30,8 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.akka.utils.ReceiverActor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 
 import java.util.Collections;
 
@@ -95,7 +97,7 @@ public class AkkaSource extends RichSourceFunction<Object>
       Props.create(classForActor, ctx, urlOfPublisher, autoAck), actorName);
 
     LOG.info("Started the Receiver actor {} successfully", actorName);
-    receiverActorSystem.awaitTermination();
+    Await.result(receiverActorSystem.whenTerminated(), Duration.Inf());
   }
 
   @Override
@@ -103,7 +105,7 @@ public class AkkaSource extends RichSourceFunction<Object>
     LOG.info("Closing source");
     if (receiverActorSystem != null) {
       receiverActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
-      receiverActorSystem.shutdown();
+      receiverActorSystem.terminate();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java
 
b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java
index 99a1893..6327bdd 100644
--- 
a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java
+++ 
b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java
@@ -34,6 +34,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -80,8 +82,8 @@ public class AkkaSourceTest {
 
   @After
   public void afterTest() throws Exception {
-    feederActorSystem.shutdown();
-    feederActorSystem.awaitTermination();
+    feederActorSystem.terminate();
+    Await.result(feederActorSystem.whenTerminated(), Duration.Inf());
 
     source.cancel();
     sourceThread.join();

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
 
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 00bfc39..41b1b25 100644
--- 
a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ 
b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.streaming.connectors.flume;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
@@ -55,8 +55,7 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
      *            The tuple arriving from the datastream
      */
     @Override
-    public void invoke(IN value) {
-
+    public void invoke(IN value, Context context) throws Exception {
         byte[] data = schema.serialize(value);
         client.sendDataToFlume(data);
 

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
 
b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
index 03521b9..e7f8916 100644
--- 
a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
+++ 
b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
@@ -76,7 +76,7 @@ public class InfluxDBSink extends 
RichSinkFunction<InfluxDBPoint> {
      * @param dataPoint {@link InfluxDBPoint}
      */
     @Override
-    public void invoke(InfluxDBPoint dataPoint) throws Exception {
+    public void invoke(InfluxDBPoint dataPoint, Context context) throws 
Exception {
         if (StringUtils.isNullOrWhitespaceOnly(dataPoint.getMeasurement())) {
             throw new RuntimeException("No measurement defined");
         }

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala
 
b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala
index 56c551d..f6068bc 100644
--- 
a/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala
+++ 
b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala
@@ -21,7 +21,6 @@ import java.io.{BufferedReader, InputStreamReader}
 import java.net._
 
 import org.apache.commons.lang3.SystemUtils
-import org.mortbay.util.MultiException
 import org.slf4j.LoggerFactory
 
 import scala.collection.JavaConverters._
@@ -153,8 +152,6 @@ object NettyUtil {
     exception match {
       case e: BindException if e.getMessage != null => true
       case e: BindException => isBindCollision(e.getCause)
-      case e: MultiException =>
-        
e.getThrowables.asScala.toList.map(_.asInstanceOf[Throwable]).exists(isBindCollision)
       case e: Exception => isBindCollision(e.getCause)
       case _ => false
     }

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
----------------------------------------------------------------------
diff --git 
a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
 
b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
index ee634d0..5167b3e 100644
--- 
a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
+++ 
b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
@@ -60,9 +60,9 @@ object StreamSqlExample {
     tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount)
 
     // union the two tables
-    val result = tEnv.sql("SELECT STREAM * FROM OrderA WHERE amount > 2")
+    val result = tEnv.sqlQuery("SELECT STREAM * FROM OrderA WHERE amount > 2")
 
-    result.toDataStream[Order].print()
+    result.toAppendStream[Order].print()
 
     env.execute()
   }

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index 9138862..6a03f11 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -26,8 +26,8 @@ import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSenti
 import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
 import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
 import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
 
 import org.slf4j.Logger;
@@ -125,7 +125,7 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
      * @param input The incoming data
      */
     @Override
-    public void invoke(IN input) throws Exception {
+    public void invoke(IN input, Context context) throws Exception {
         String key = redisSinkMapper.getKeyFromData(input);
         String value = redisSinkMapper.getValueFromData(input);
 

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
index 18cdc64..2bbbcb7 100644
--- 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
+++ 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
@@ -16,7 +16,7 @@
  */
 package org.apache.flink.streaming.connectors.redis;
 
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import redis.embedded.RedisServer;
@@ -25,7 +25,7 @@ import java.io.IOException;
 
 import static org.apache.flink.util.NetUtils.getAvailablePort;
 
-public abstract class RedisITCaseBase extends 
StreamingMultipleProgramsTestBase {
+public abstract class RedisITCaseBase extends AbstractTestBase {
 
     public static final int REDIS_PORT = getAvailablePort();
     public static final String REDIS_HOST = "127.0.0.1";

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
index 7af37ce..49c27c2 100755
--- 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
+++ 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.slf4j.Logger;
@@ -31,8 +33,6 @@ import org.slf4j.LoggerFactory;
 import org.wso2.siddhi.core.event.Event;
 import org.wso2.siddhi.core.stream.output.StreamCallback;
 import org.wso2.siddhi.query.api.definition.AbstractDefinition;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 
 /**
  * Siddhi Stream output callback handler and conver siddhi {@link Event} to 
required output type,

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
index 8840dac..1c1096c 100755
--- 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
+++ 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
@@ -17,11 +17,11 @@
 
 package org.apache.flink.streaming.siddhi.operator;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
index 821c594..651288f 100755
--- 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
+++ 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
@@ -41,7 +41,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
 import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -52,7 +52,7 @@ import static org.junit.Assert.assertEquals;
 /**
  * Flink-siddhi library integration test cases
  */
-public class SiddhiCEPITCase extends StreamingMultipleProgramsTestBase 
implements Serializable {
+public class SiddhiCEPITCase extends AbstractTestBase implements Serializable {
 
     @Rule
     public transient TemporaryFolder tempFolder = new TemporaryFolder();

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c2d36ed..d2d435c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,7 +93,7 @@
     <log4j.version>1.2.17</log4j.version>
 
     <!-- Flink version -->
-    <flink.version>1.3.0</flink.version>
+    <flink.version>1.5.1</flink.version>
 
     <PermGen>64m</PermGen>
     <MaxPermGen>512m</MaxPermGen>
@@ -704,40 +704,6 @@
         <module>distribution</module>
       </modules>
     </profile>
-
-    <profile>
-      <id>scala-2.10</id>
-      <properties>
-        <scala.version>2.10.5</scala.version>
-        <scala.binary.version>2.10</scala.binary.version>
-      </properties>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-enforcer-plugin</artifactId>
-            <executions>
-              <execution>
-                <id>enforce-versions</id>
-                <goals>
-                  <goal>enforce</goal>
-                </goals>
-                <configuration>
-                  <rules>
-                    <bannedDependencies>
-                      <excludes combine.children="append">
-                        <exclude>*:*_2.11</exclude>
-                      </excludes>
-                    </bannedDependencies>
-                  </rules>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-
     <profile>
       <id>scala-2.11</id>
       <activation>

Reply via email to