This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch BAHIR-308
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/BAHIR-308 by this push:
new d23ffb9 [BAHIR-308] Bump flink version to 1.16.1
d23ffb9 is described below
commit d23ffb9a05c3bad0b8b8703ca21dd22fc8f79b9a
Author: Joao Boto <[email protected]>
AuthorDate: Wed Mar 8 17:21:45 2023 +0100
[BAHIR-308] Bump flink version to 1.16.1
---
.github/workflows/maven-ci.yml | 2 +-
.../streaming/connectors/flume/FlumeSinkTest.java | 8 +-
.../connectors/redis/RedisDescriptorTest.java | 27 ++--
.../flink/streaming/siddhi/SiddhiCEPITCase.java | 172 ++++++++++++++++++++-
pom.xml | 2 +-
5 files changed, 186 insertions(+), 25 deletions(-)
diff --git a/.github/workflows/maven-ci.yml b/.github/workflows/maven-ci.yml
index ef344b9..6488728 100644
--- a/.github/workflows/maven-ci.yml
+++ b/.github/workflows/maven-ci.yml
@@ -31,7 +31,7 @@ jobs:
fail-fast: false
matrix:
java: ['8', '11']
- flink-version: ['1.15.3']
+ flink-version: ['1.16.1']
connector: [
'flink-connector-activemq',
'flink-connector-akka',
diff --git
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
index c212fdf..f59bde8 100644
---
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
+++
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
@@ -22,9 +22,7 @@ import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.junit.jupiter.api.Test;
-import java.nio.charset.Charset;
-
-import static org.apache.flink.test.util.TestUtils.tryExecute;
+import java.nio.charset.StandardCharsets;
public class FlumeSinkTest extends FlumeServerTest {
@@ -35,7 +33,7 @@ public class FlumeSinkTest extends FlumeServerTest {
FlumeEventBuilder<String> flumeEventBuilder = new
FlumeEventBuilder<String>() {
@Override
public Event createFlumeEvent(String value, RuntimeContext ctx) {
- return EventBuilder.withBody(value, Charset.forName("UTF-8"));
+ return EventBuilder.withBody(value, StandardCharsets.UTF_8);
}
};
@@ -43,7 +41,7 @@ public class FlumeSinkTest extends FlumeServerTest {
environment.fromElements("string1", "string2").addSink(flumeSink);
- tryExecute(environment, "FlumeTest");
+ environment.executeAsync( "FlumeTest");
}
}
diff --git
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
index abb1b2e..6754170 100644
---
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
+++
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
@@ -24,6 +24,7 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.expressions.Expression;
import org.apache.flink.types.Row;
import org.junit.Before;
import org.junit.Test;
@@ -49,7 +50,9 @@ public class RedisDescriptorTest extends RedisITCaseBase {
.inStreamingMode()
.build();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, settings);
- tableEnvironment.registerDataStream("t1", source, "k, v");
+
+ Table table = tableEnvironment.fromDataStream(source);
+ tableEnvironment.createTemporaryView("t1", table);
/*Redis redis = new Redis()
.mode(RedisValidator.REDIS_CLUSTER)
@@ -57,17 +60,17 @@ public class RedisDescriptorTest extends RedisITCaseBase {
.ttl(100000)
.property(RedisValidator.REDIS_NODES, REDIS_HOST+ ":" +
REDIS_PORT);*/
- tableEnvironment.executeSql("create table redis " +
- "(k string, " +
- "v bigint) " +
- "with (" +
- "'connector.type'='redis'," +
- "'redis-mode'='cluster'," +
- "'cluster-nodes'='"+String.format("%s:%s",REDIS_HOST,
REDIS_PORT)+"'," +
- "'command'='INCRBY_EX'," +
- "'key.ttl'='100000')");
-
- tableEnvironment.executeSql("insert into redis select k, v from t1");
+ tableEnvironment.executeSql(
+ "create table redis (key string, number bigint) " +
+ "with (" +
+ "'connector.type'='redis'," +
+ "'redis-mode'='cluster'," +
+ "'cluster-nodes'='"+String.format("%s:%s",REDIS_HOST,
REDIS_PORT)+"'," +
+ "'command'='INCRBY_EX'," +
+ "'key.ttl'='100000'" +
+ ")");
+
+ tableEnvironment.executeSql("insert into redis select * from t1");
}
@Test
diff --git
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
index a0d9935..1422531 100755
---
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
+++
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
@@ -41,14 +41,20 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.*;
import static org.junit.Assert.assertEquals;
-
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Flink-siddhi library integration test cases
*/
@@ -402,4 +408,158 @@ public class SiddhiCEPITCase extends AbstractTestBase
implements Serializable {
output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
}
+
+ // All code above are migrated from TestBaseUtils.java (in version 1.15)
+ public static void compareResultsByLinesInMemory(String expectedResultStr,
String resultPath)
+ throws Exception {
+ compareResultsByLinesInMemory(expectedResultStr, resultPath, new
String[0]);
+ }
+
+ public static void compareResultsByLinesInMemory(
+ String expectedResultStr, String resultPath, String[]
excludePrefixes)
+ throws Exception {
+
+ ArrayList<String> list = new ArrayList<>();
+ readAllResultLines(list, resultPath, excludePrefixes, false);
+
+ String[] result = list.toArray(new String[list.size()]);
+ Arrays.sort(result);
+
+ String[] expected =
+ expectedResultStr.isEmpty() ? new String[0] :
expectedResultStr.split("\n");
+ Arrays.sort(expected);
+
+ if (expected.length != result.length || !Arrays.deepEquals(expected,
result)) {
+ String msg =
+ String.format(
+ "Different elements in arrays: expected %d
elements and received %d\n"
+ + "files: %s\n expected: %s\n received:
%s",
+ expected.length,
+ result.length,
+ Arrays.toString(getAllInvolvedFiles(resultPath,
excludePrefixes)),
+ Arrays.toString(expected),
+ Arrays.toString(result));
+ fail(msg);
+ }
+ }
+
+ public static void readAllResultLines(List<String> target, String
resultPath)
+ throws IOException {
+ readAllResultLines(target, resultPath, new String[] {});
+ }
+
+ public static void readAllResultLines(
+ List<String> target, String resultPath, String[] excludePrefixes)
throws IOException {
+
+ readAllResultLines(target, resultPath, excludePrefixes, false);
+ }
+
+ public static void readAllResultLines(
+ List<String> target,
+ String resultPath,
+ String[] excludePrefixes,
+ boolean inOrderOfFiles)
+ throws IOException {
+
+ checkArgument(resultPath != null, "resultPath cannot be be null");
+
+ final BufferedReader[] readers =
+ getResultReader(resultPath, excludePrefixes, inOrderOfFiles);
+ try {
+ for (BufferedReader reader : readers) {
+ String s;
+ while ((s = reader.readLine()) != null) {
+ target.add(s);
+ }
+ }
+ } finally {
+ for (BufferedReader reader : readers) {
+ org.apache.flink.util.IOUtils.closeQuietly(reader);
+ }
+ }
+ }
+
+ public static BufferedReader[] getResultReader(
+ String resultPath, String[] excludePrefixes, boolean
inOrderOfFiles)
+ throws IOException {
+
+ File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
+
+ if (inOrderOfFiles) {
+ // sort the files after their name (1, 2, 3, 4)...
+ // we cannot sort by path, because strings sort by prefix
+ Arrays.sort(
+ files,
+ new Comparator<File>() {
+
+ @Override
+ public int compare(File o1, File o2) {
+ try {
+ int f1 = Integer.parseInt(o1.getName());
+ int f2 = Integer.parseInt(o2.getName());
+ return f1 < f2 ? -1 : (f1 > f2 ? 1 : 0);
+ } catch (NumberFormatException e) {
+ throw new RuntimeException(
+ "The file names are no numbers and
cannot be ordered: "
+ + o1.getName()
+ + "/"
+ + o2.getName());
+ }
+ }
+ });
+ }
+
+ BufferedReader[] readers = new BufferedReader[files.length];
+ for (int i = 0; i < files.length; i++) {
+ readers[i] =
+ new BufferedReader(
+ new InputStreamReader(
+ new FileInputStream(files[i]),
StandardCharsets.UTF_8));
+ }
+ return readers;
+ }
+
+ private static File[] getAllInvolvedFiles(String resultPath, final
String[] excludePrefixes) {
+ final File result = asFile(resultPath);
+ assertTrue("Result file was not written", result.exists());
+
+ if (result.isDirectory()) {
+ try {
+ return Files.walk(result.toPath())
+ .filter(Files::isRegularFile)
+ .filter(
+ path -> {
+ for (String prefix : excludePrefixes) {
+ if
(path.getFileName().startsWith(prefix)) {
+ return false;
+ }
+ }
+
+ return true;
+ })
+ .map(Path::toFile)
+ .filter(file -> !file.isHidden())
+ .toArray(File[]::new);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to retrieve result files");
+ }
+ } else {
+ return new File[] {result};
+ }
+ }
+
+ protected static File asFile(String path) {
+ try {
+ URI uri = new URI(path);
+ if (uri.getScheme().equals("file")) {
+ return new File(uri.getPath());
+ } else {
+ throw new IllegalArgumentException("This path does not denote
a local file.");
+ }
+ } catch (URISyntaxException | NullPointerException e) {
+ throw new IllegalArgumentException(
+ "This path does not describe a valid local file URI.", e);
+ }
+ }
+
}
diff --git a/pom.xml b/pom.xml
index 8bfd3ee..9d5ae98 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,7 +113,7 @@
<maven.compiler.target>${java.version}</maven.compiler.target>
<!-- Flink version -->
- <flink.version>1.15.3</flink.version>
+ <flink.version>1.16.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.8</scala.version>