Repository: bahir-flink Updated Branches: refs/heads/master 55605cbde -> 567792b26
[BAHIR-173] Update Flink version to 1.5.1 Closes #29 Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/567792b2 Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/567792b2 Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/567792b2 Branch: refs/heads/master Commit: 567792b26f5b9d80cb423f5d660e3011d475057c Parents: 55605cb Author: Dominik.Wosinski <b...@wp.pl> Authored: Fri Jul 20 23:47:12 2018 +0200 Committer: Luciano Resende <lrese...@apache.org> Committed: Thu Jul 26 09:35:20 2018 -0400 ---------------------------------------------------------------------- .travis.yml | 12 ++----- .../streaming/connectors/activemq/AMQSink.java | 4 +-- .../connectors/activemq/AMQSinkConfig.java | 2 +- .../connectors/activemq/AMQSource.java | 6 ++-- .../connectors/activemq/AMQSourceConfig.java | 2 +- .../connectors/activemq/AMQSinkTest.java | 10 +++--- .../connectors/activemq/AMQSourceTest.java | 14 ++++---- .../activemq/ActiveMQConnectorITCase.java | 13 +++---- flink-connector-akka/pom.xml | 7 +++- .../streaming/connectors/akka/AkkaSource.java | 6 ++-- .../connectors/akka/AkkaSourceTest.java | 6 ++-- .../streaming/connectors/flume/FlumeSink.java | 5 ++- .../connectors/influxdb/InfluxDBSink.java | 2 +- .../connectors/netty/example/NettyUtil.scala | 3 -- .../netty/example/StreamSqlExample.scala | 4 +-- .../streaming/connectors/redis/RedisSink.java | 4 +-- .../connectors/redis/RedisITCaseBase.java | 4 +-- .../operator/StreamInMemOutputHandler.java | 4 +-- .../siddhi/operator/StreamOutputHandler.java | 4 +-- .../flink/streaming/siddhi/SiddhiCEPITCase.java | 4 +-- pom.xml | 36 +------------------- 21 files changed, 58 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index c3baf70..083e75d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -32,20 +32,12 @@ matrix: include: - jdk: oraclejdk8 env: - - FLINK_VERSION="1.3.0" SCALA_VERSION="2.11" + - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" - CACHE_NAME=JDK8_F130_A - - jdk: oraclejdk8 - env: - - FLINK_VERSION="1.3.0" SCALA_VERSION="2.10" - - CACHE_NAME=JDK8_F130_B - jdk: openjdk8 env: - - FLINK_VERSION="1.3.0" SCALA_VERSION="2.11" + - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" - CACHE_NAME=JDK8_F130_C - - jdk: openjdk8 - env: - - FLINK_VERSION="1.3.0" SCALA_VERSION="2.10" - - CACHE_NAME=JDK8_F130_D before_install: - ./dev/change-scala-version.sh $SCALA_VERSION http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java index a494162..b33bffe 100644 --- a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java +++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java @@ -18,10 +18,10 @@ package org.apache.flink.streaming.connectors.activemq; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.activemq.internal.AMQUtil; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,7 +128,7 @@ public class AMQSink<IN> extends RichSinkFunction<IN> { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value, Context context) throws Exception { try { byte[] bytes = serializationSchema.serialize(value); BytesMessage message = session.createBytesMessage(); http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java index e10c3c8..51a6aac 100644 --- a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java +++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java @@ -17,7 +17,7 @@ package org.apache.flink.streaming.connectors.activemq; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.util.Preconditions; /** http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java index e64b8fd..4f2114f 100644 --- a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java +++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; @@ -28,7 +29,6 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener; import org.apache.flink.streaming.connectors.activemq.internal.AMQUtil; import org.apache.flink.streaming.connectors.activemq.internal.RunningChecker; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +40,7 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import java.util.HashMap; -import java.util.List; +import java.util.Set; /** * Source for reading messages from an ActiveMQ queue. @@ -191,7 +191,7 @@ public class AMQSource<OUT> extends MessageAcknowledgingSourceBase<OUT, String> } @Override - protected void acknowledgeIDs(long checkpointId, List<String> UIds) { + protected void acknowledgeIDs(long checkpointId, Set<String> UIds) { try { for (String messageId : UIds) { Message unacknowledgedMessage = unacknowledgedMessages.get(messageId); http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java index dd73b0e..06cb267 100644 --- a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java +++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java @@ -17,8 +17,8 @@ package org.apache.flink.streaming.connectors.activemq; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.connectors.activemq.internal.RunningChecker; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.util.Preconditions; /** http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java index e4d67c3..52493d7 100644 --- a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java +++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSinkTest.java @@ -19,9 +19,9 @@ package org.apache.flink.streaming.connectors.activemq; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.junit.Before; import org.junit.Test; @@ -83,7 +83,7 @@ public class AMQSinkTest { @Test public void messageSentToProducer() throws Exception { byte[] expectedMessage = serializationSchema.serialize("msg"); - amqSink.invoke("msg"); + amqSink.invoke("msg", null); verify(producer).send(message); verify(message).writeBytes(expectedMessage); @@ -121,7 +121,7 @@ public class AMQSinkTest { when(session.createBytesMessage()).thenThrow(JMSException.class); amqSink.setLogFailuresOnly(true); - amqSink.invoke("msg"); + amqSink.invoke("msg", null); } @SuppressWarnings("unchecked") @@ -129,7 +129,7 @@ public class AMQSinkTest { public void exceptionOnSendAreThrownByDefault() throws Exception { when(session.createBytesMessage()).thenThrow(JMSException.class); - amqSink.invoke("msg"); + amqSink.invoke("msg", null); } @Test http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java index 0e6dd31..9c7be72 100644 --- a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java +++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.configuration.Configuration; @@ -27,7 +28,6 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener; import org.apache.flink.streaming.connectors.activemq.internal.RunningChecker; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -161,7 +161,7 @@ public class AMQSourceTest { @Test public void acknowledgeReceivedMessage() throws Exception { amqSource.run(context); - amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID)); + amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singleton(MSG_ID)); verify(message).acknowledge(); } @@ -169,7 +169,7 @@ public class AMQSourceTest { @Test public void handleUnknownIds() throws Exception { amqSource.run(context); - amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList("unknown-id")); + amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singleton("unknown-id")); verify(message, never()).acknowledge(); } @@ -177,8 +177,8 @@ public class AMQSourceTest { @Test public void doNotAcknowledgeMessageTwice() throws Exception { amqSource.run(context); - amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID)); - amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID)); + amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singleton(MSG_ID)); + amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singleton(MSG_ID)); verify(message, times(1)).acknowledge(); } @@ -196,7 +196,7 @@ public class AMQSourceTest { doThrow(JMSException.class).when(message).acknowledge(); amqSource.run(context); - amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID)); + amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singleton(MSG_ID)); } @Test @@ -206,7 +206,7 @@ public class AMQSourceTest { doThrow(JMSException.class).when(message).acknowledge(); amqSource.run(context); - amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singletonList(MSG_ID)); + amqSource.acknowledgeIDs(CHECKPOINT_ID, Collections.singleton(MSG_ID)); } @Test http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java index 9af6dd5..3b31fc4 100644 --- a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java +++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java @@ -20,8 +20,10 @@ package org.apache.flink.streaming.connectors.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -30,7 +32,6 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.test.util.SuccessException; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -60,8 +61,8 @@ public class ActiveMQConnectorITCase { // start also a re-usable Flink mini cluster Configuration flinkConfig = new Configuration(); flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); - flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS.key(), 8); + flinkConfig.setInteger(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), 16); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); flink = new LocalFlinkMiniCluster(flinkConfig, false); @@ -74,7 +75,7 @@ public class ActiveMQConnectorITCase { public static void afterClass() { flinkPort = -1; if (flink != null) { - flink.shutdown(); + flink.startInternalShutdown(); } } @@ -158,7 +159,7 @@ public class ActiveMQConnectorITCase { .addSink(new SinkFunction<String>() { final HashSet<Integer> set = new HashSet<>(); @Override - public void invoke(String value) throws Exception { + public void invoke(String value, Context context) throws Exception { int val = Integer.parseInt(value.split("-")[1]); set.add(val); @@ -181,7 +182,7 @@ public class ActiveMQConnectorITCase { sink.open(new Configuration()); for (int i = 0; i < MESSAGES_NUM; i++) { - sink.invoke("amq-" + i); + sink.invoke("amq-" + i, null); } AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>() http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-akka/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connector-akka/pom.xml b/flink-connector-akka/pom.xml index 5eb261b..3dab6b8 100644 --- a/flink-connector-akka/pom.xml +++ b/flink-connector-akka/pom.xml @@ -35,11 +35,16 @@ under the License. <properties> <mockito.version>1.10.19</mockito.version> - <akka.version>2.3.15</akka.version> + <akka.version>2.4.20</akka.version> </properties> <dependencies> <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java ---------------------------------------------------------------------- diff --git a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java index 3925d0b..d8a6919 100644 --- a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java +++ b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java @@ -30,6 +30,8 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.akka.utils.ReceiverActor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; import java.util.Collections; @@ -95,7 +97,7 @@ public class AkkaSource extends RichSourceFunction<Object> Props.create(classForActor, ctx, urlOfPublisher, autoAck), actorName); LOG.info("Started the Receiver actor {} successfully", actorName); - receiverActorSystem.awaitTermination(); + Await.result(receiverActorSystem.whenTerminated(), Duration.Inf()); } @Override @@ -103,7 +105,7 @@ public class AkkaSource extends RichSourceFunction<Object> LOG.info("Closing source"); if (receiverActorSystem != null) { receiverActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - receiverActorSystem.shutdown(); + receiverActorSystem.terminate(); } } http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java index 99a1893..6327bdd 100644 --- a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java +++ b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java @@ -34,6 +34,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; import java.io.File; import java.util.ArrayList; @@ -80,8 +82,8 @@ public class AkkaSourceTest { @After public void afterTest() throws Exception { - feederActorSystem.shutdown(); - feederActorSystem.awaitTermination(); + feederActorSystem.terminate(); + Await.result(feederActorSystem.whenTerminated(), Duration.Inf()); source.cancel(); sourceThread.join(); http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java ---------------------------------------------------------------------- diff --git a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java index 00bfc39..41b1b25 100644 --- a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java +++ b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java @@ -17,10 +17,10 @@ package org.apache.flink.streaming.connectors.flume; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; @@ -55,8 +55,7 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> { * The tuple arriving from the datastream */ @Override - public void invoke(IN value) { - + public void invoke(IN value, Context context) throws Exception { byte[] data = schema.serialize(value); client.sendDataToFlume(data); http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java ---------------------------------------------------------------------- diff --git a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java index 03521b9..e7f8916 100644 --- a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java +++ b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java @@ -76,7 +76,7 @@ public class InfluxDBSink extends RichSinkFunction<InfluxDBPoint> { * @param dataPoint {@link InfluxDBPoint} */ @Override - public void invoke(InfluxDBPoint dataPoint) throws Exception { + public void invoke(InfluxDBPoint dataPoint, Context context) throws Exception { if (StringUtils.isNullOrWhitespaceOnly(dataPoint.getMeasurement())) { throw new RuntimeException("No measurement defined"); } http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala ---------------------------------------------------------------------- diff --git a/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala index 56c551d..f6068bc 100644 --- a/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala +++ b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala @@ -21,7 +21,6 @@ import java.io.{BufferedReader, InputStreamReader} import java.net._ import org.apache.commons.lang3.SystemUtils -import org.mortbay.util.MultiException import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ @@ -153,8 +152,6 @@ object NettyUtil { exception match { case e: BindException if e.getMessage != null => true case e: BindException => isBindCollision(e.getCause) - case e: MultiException => - e.getThrowables.asScala.toList.map(_.asInstanceOf[Throwable]).exists(isBindCollision) case e: Exception => isBindCollision(e.getCause) case _ => false } http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala ---------------------------------------------------------------------- diff --git a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala index ee634d0..5167b3e 100644 --- a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala +++ b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala @@ -60,9 +60,9 @@ object StreamSqlExample { tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount) // union the two tables - val result = tEnv.sql("SELECT STREAM * FROM OrderA WHERE amount > 2") + val result = tEnv.sqlQuery("SELECT STREAM * FROM OrderA WHERE amount > 2") - result.toDataStream[Order].print() + result.toAppendStream[Order].print() env.execute() } http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java index 9138862..6a03f11 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java @@ -26,8 +26,8 @@ import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSenti import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; -import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import org.slf4j.Logger; @@ -125,7 +125,7 @@ public class RedisSink<IN> extends RichSinkFunction<IN> { * @param input The incoming data */ @Override - public void invoke(IN input) throws Exception { + public void invoke(IN input, Context context) throws Exception { String key = redisSinkMapper.getKeyFromData(input); String value = redisSinkMapper.getValueFromData(input); http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java index 18cdc64..2bbbcb7 100644 --- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java +++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java @@ -16,7 +16,7 @@ */ package org.apache.flink.streaming.connectors.redis; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.junit.AfterClass; import org.junit.BeforeClass; import redis.embedded.RedisServer; @@ -25,7 +25,7 @@ import java.io.IOException; import static org.apache.flink.util.NetUtils.getAvailablePort; -public abstract class RedisITCaseBase extends StreamingMultipleProgramsTestBase { +public abstract class RedisITCaseBase extends AbstractTestBase { public static final int REDIS_PORT = getAvailablePort(); public static final String REDIS_HOST = "127.0.0.1"; http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java index 7af37ce..49c27c2 100755 --- a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java @@ -24,6 +24,8 @@ import java.util.Map; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.slf4j.Logger; @@ -31,8 +33,6 @@ import org.slf4j.LoggerFactory; import org.wso2.siddhi.core.event.Event; import org.wso2.siddhi.core.stream.output.StreamCallback; import org.wso2.siddhi.query.api.definition.AbstractDefinition; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; /** * Siddhi Stream output callback handler and conver siddhi {@link Event} to required output type, http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java index 8840dac..1c1096c 100755 --- a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java @@ -17,11 +17,11 @@ package org.apache.flink.streaming.siddhi.operator; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java index 821c594..651288f 100755 --- a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java @@ -41,7 +41,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.streaming.api.operators.StreamMap; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -52,7 +52,7 @@ import static org.junit.Assert.assertEquals; /** * Flink-siddhi library integration test cases */ -public class SiddhiCEPITCase extends StreamingMultipleProgramsTestBase implements Serializable { +public class SiddhiCEPITCase extends AbstractTestBase implements Serializable { @Rule public transient TemporaryFolder tempFolder = new TemporaryFolder(); http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/567792b2/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c2d36ed..d2d435c 100644 --- a/pom.xml +++ b/pom.xml @@ -93,7 +93,7 @@ <log4j.version>1.2.17</log4j.version> <!-- Flink version --> - <flink.version>1.3.0</flink.version> + <flink.version>1.5.1</flink.version> <PermGen>64m</PermGen> <MaxPermGen>512m</MaxPermGen> @@ -704,40 +704,6 @@ <module>distribution</module> </modules> </profile> - - <profile> - <id>scala-2.10</id> - <properties> - <scala.version>2.10.5</scala.version> - <scala.binary.version>2.10</scala.binary.version> - </properties> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-enforcer-plugin</artifactId> - <executions> - <execution> - <id>enforce-versions</id> - <goals> - <goal>enforce</goal> - </goals> - <configuration> - <rules> - <bannedDependencies> - <excludes combine.children="append"> - <exclude>*:*_2.11</exclude> - </excludes> - </bannedDependencies> - </rules> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - </profile> - <profile> <id>scala-2.11</id> <activation>