Repository: bahir-flink Updated Branches: refs/heads/master 9f306889f -> 3f180342c
[BAHIR-91] Upgrade Flink version to 1.2.0 This closes #11 This closes #9 (Closing PR due to inactivity) Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/3f180342 Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/3f180342 Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/3f180342 Branch: refs/heads/master Commit: 3f180342ca0efaf48ffa4d1f7b6dbfcb4cd892b6 Parents: 9f30688 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Wed Mar 1 14:41:21 2017 +0800 Committer: Robert Metzger <[email protected]> Committed: Sat Mar 4 16:46:50 2017 +0100 ---------------------------------------------------------------------- .travis.yml | 5 ++--- flink-connector-activemq/pom.xml | 1 + .../connectors/activemq/AMQSourceTest.java | 19 ++++++++++++++++++ .../activemq/ActiveMQConnectorITCase.java | 21 +++++++++++++++----- flink-connector-redis/pom.xml | 11 ++++++++++ pom.xml | 2 +- 6 files changed, 50 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index fd3733a..53f31a5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,8 +18,7 @@ language: java env: -# - FLINK_VERSION="1.1.0" - - FLINK_VERSION="1.1.1" + - FLINK_VERSION="1.2.0" jdk: - oraclejdk8 @@ -28,4 +27,4 @@ jdk: install: true -script: mvn clean verify -Dflink.version=$FLINK_VERSION \ No newline at end of file +script: mvn clean verify -Dflink.version=$FLINK_VERSION http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/flink-connector-activemq/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/pom.xml b/flink-connector-activemq/pom.xml index d76ca89..b947f8e 100644 --- a/flink-connector-activemq/pom.xml +++ b/flink-connector-activemq/pom.xml @@ -85,6 +85,7 @@ under the License. <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> <groupId>org.apache.activemq.tooling</groupId> <artifactId>activemq-junit</artifactId> http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java index 05d0d60..2e6efa6 100644 --- a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java +++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/AMQSourceTest.java @@ -19,7 +19,10 @@ package org.apache.flink.streaming.connectors.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener; @@ -97,6 +100,22 @@ public class AMQSourceTest { amqSource = new AMQSource<>(config); amqSource.setRuntimeContext(createRuntimeContext()); amqSource.open(new Configuration()); + amqSource.initializeState(new FunctionInitializationContext() { + @Override + public boolean isRestored() { + return false; + } + + @Override + public OperatorStateStore getOperatorStateStore() { + return mock(OperatorStateStore.class); + } + + @Override + public KeyedStateStore getKeyedStateStore() { + return mock(KeyedStateStore.class); + } + }); } private RuntimeContext createRuntimeContext() { http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java ---------------------------------------------------------------------- diff --git a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java index 985e06d..24a257f 100644 --- a/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java +++ b/flink-connector-activemq/src/test/java/org/apache/flink/streaming/connectors/activemq/ActiveMQConnectorITCase.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @@ -29,7 +31,6 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.test.util.SuccessException; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -51,7 +52,7 @@ public class ActiveMQConnectorITCase { public static final int MESSAGES_NUM = 10000; public static final String QUEUE_NAME = "queue"; public static final String TOPIC_NAME = "topic"; - private static ForkableFlinkMiniCluster flink; + private static LocalFlinkMiniCluster flink; private static int flinkPort; @BeforeClass @@ -63,7 +64,7 @@ public class ActiveMQConnectorITCase { flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); - flink = new ForkableFlinkMiniCluster(flinkConfig, false); + flink = new LocalFlinkMiniCluster(flinkConfig, false); flink.start(); flinkPort = flink.getLeaderRPCPort(); @@ -211,9 +212,19 @@ public class ActiveMQConnectorITCase { while (deadline.hasTimeLeft() && sourceContext.getIdsNum() < MESSAGES_NUM) { Thread.sleep(100); Random random = new Random(); - long checkpointId = random.nextLong(); + final long checkpointId = random.nextLong(); synchronized (sourceContext.getCheckpointLock()) { - source.snapshotState(checkpointId, System.currentTimeMillis()); + source.snapshotState(new FunctionSnapshotContext() { + @Override + public long getCheckpointId() { + return checkpointId; + } + + @Override + public long getCheckpointTimestamp() { + return System.currentTimeMillis(); + } + }); source.notifyCheckpointComplete(checkpointId); } } http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/flink-connector-redis/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml index 7920ca5..2412822 100644 --- a/flink-connector-redis/pom.xml +++ b/flink-connector-redis/pom.xml @@ -74,4 +74,15 @@ under the License. <scope>test</scope> </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <inherited>true</inherited> + <extensions>true</extensions> + </plugin> + </plugins> + </build> </project> http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/3f180342/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 01818b0..b8458cb 100644 --- a/pom.xml +++ b/pom.xml @@ -90,7 +90,7 @@ <log4j.version>1.2.17</log4j.version> <!-- Flink version --> - <flink.version>1.1.1</flink.version> + <flink.version>1.2.0</flink.version> <PermGen>64m</PermGen> <MaxPermGen>512m</MaxPermGen>
