This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch BAHIR-308
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/BAHIR-308 by this push:
     new d23ffb9  [BAHIR-308] Bump flink version to 1.16.1
d23ffb9 is described below

commit d23ffb9a05c3bad0b8b8703ca21dd22fc8f79b9a
Author: Joao Boto <[email protected]>
AuthorDate: Wed Mar 8 17:21:45 2023 +0100

    [BAHIR-308] Bump flink version to 1.16.1
---
 .github/workflows/maven-ci.yml                     |   2 +-
 .../streaming/connectors/flume/FlumeSinkTest.java  |   8 +-
 .../connectors/redis/RedisDescriptorTest.java      |  27 ++--
 .../flink/streaming/siddhi/SiddhiCEPITCase.java    | 172 ++++++++++++++++++++-
 pom.xml                                            |   2 +-
 5 files changed, 186 insertions(+), 25 deletions(-)

diff --git a/.github/workflows/maven-ci.yml b/.github/workflows/maven-ci.yml
index ef344b9..6488728 100644
--- a/.github/workflows/maven-ci.yml
+++ b/.github/workflows/maven-ci.yml
@@ -31,7 +31,7 @@ jobs:
       fail-fast: false
       matrix:
        java: ['8', '11']
-       flink-version: ['1.15.3']
+       flink-version: ['1.16.1']
        connector: [
          'flink-connector-activemq',
          'flink-connector-akka',
diff --git 
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
 
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
index c212fdf..f59bde8 100644
--- 
a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
+++ 
b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
@@ -22,9 +22,7 @@ import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
 import org.junit.jupiter.api.Test;
 
-import java.nio.charset.Charset;
-
-import static org.apache.flink.test.util.TestUtils.tryExecute;
+import java.nio.charset.StandardCharsets;
 
 public class FlumeSinkTest extends FlumeServerTest {
 
@@ -35,7 +33,7 @@ public class FlumeSinkTest extends FlumeServerTest {
         FlumeEventBuilder<String> flumeEventBuilder = new 
FlumeEventBuilder<String>() {
             @Override
             public Event createFlumeEvent(String value, RuntimeContext ctx) {
-                return EventBuilder.withBody(value, Charset.forName("UTF-8"));
+                return EventBuilder.withBody(value, StandardCharsets.UTF_8);
             }
         };
 
@@ -43,7 +41,7 @@ public class FlumeSinkTest extends FlumeServerTest {
 
         environment.fromElements("string1", "string2").addSink(flumeSink);
 
-        tryExecute(environment, "FlumeTest");
+        environment.executeAsync( "FlumeTest");
     }
 
 }
diff --git 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
index abb1b2e..6754170 100644
--- 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
+++ 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.table.api.*;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.types.Row;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,7 +50,9 @@ public class RedisDescriptorTest extends  RedisITCaseBase {
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(env, settings);
-        tableEnvironment.registerDataStream("t1", source, "k, v");
+
+        Table table = tableEnvironment.fromDataStream(source);
+        tableEnvironment.createTemporaryView("t1", table);
 
         /*Redis redis = new Redis()
                 .mode(RedisValidator.REDIS_CLUSTER)
@@ -57,17 +60,17 @@ public class RedisDescriptorTest extends  RedisITCaseBase {
                 .ttl(100000)
                 .property(RedisValidator.REDIS_NODES, REDIS_HOST+ ":" + 
REDIS_PORT);*/
 
-        tableEnvironment.executeSql("create table redis " +
-                        "(k string, " +
-                        "v bigint) " +
-                        "with (" +
-                        "'connector.type'='redis'," +
-                        "'redis-mode'='cluster'," +
-                        "'cluster-nodes'='"+String.format("%s:%s",REDIS_HOST, 
REDIS_PORT)+"'," +
-                        "'command'='INCRBY_EX'," +
-                        "'key.ttl'='100000')");
-
-        tableEnvironment.executeSql("insert into redis select k, v from t1");
+        tableEnvironment.executeSql(
+                "create table redis (key string, number bigint) " +
+                "with (" +
+                "'connector.type'='redis'," +
+                "'redis-mode'='cluster'," +
+                "'cluster-nodes'='"+String.format("%s:%s",REDIS_HOST, 
REDIS_PORT)+"'," +
+                "'command'='INCRBY_EX'," +
+                "'key.ttl'='100000'" +
+                ")");
+
+        tableEnvironment.executeSql("insert into redis select * from t1");
     }
 
     @Test
diff --git 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
index a0d9935..1422531 100755
--- 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
+++ 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
@@ -41,14 +41,20 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.*;
 
 import static org.junit.Assert.assertEquals;
-
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.apache.flink.util.Preconditions.checkArgument;
 /**
  * Flink-siddhi library integration test cases
  */
@@ -402,4 +408,158 @@ public class SiddhiCEPITCase extends AbstractTestBase 
implements Serializable {
         output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
         env.execute();
     }
+
+    // All code above are migrated from TestBaseUtils.java (in version 1.15)
+    public static void compareResultsByLinesInMemory(String expectedResultStr, 
String resultPath)
+            throws Exception {
+        compareResultsByLinesInMemory(expectedResultStr, resultPath, new 
String[0]);
+    }
+
+    public static void compareResultsByLinesInMemory(
+            String expectedResultStr, String resultPath, String[] 
excludePrefixes)
+            throws Exception {
+
+        ArrayList<String> list = new ArrayList<>();
+        readAllResultLines(list, resultPath, excludePrefixes, false);
+
+        String[] result = list.toArray(new String[list.size()]);
+        Arrays.sort(result);
+
+        String[] expected =
+                expectedResultStr.isEmpty() ? new String[0] : 
expectedResultStr.split("\n");
+        Arrays.sort(expected);
+
+        if (expected.length != result.length || !Arrays.deepEquals(expected, 
result)) {
+            String msg =
+                    String.format(
+                            "Different elements in arrays: expected %d 
elements and received %d\n"
+                                    + "files: %s\n expected: %s\n received: 
%s",
+                            expected.length,
+                            result.length,
+                            Arrays.toString(getAllInvolvedFiles(resultPath, 
excludePrefixes)),
+                            Arrays.toString(expected),
+                            Arrays.toString(result));
+            fail(msg);
+        }
+    }
+
+    public static void readAllResultLines(List<String> target, String 
resultPath)
+            throws IOException {
+        readAllResultLines(target, resultPath, new String[] {});
+    }
+
+    public static void readAllResultLines(
+            List<String> target, String resultPath, String[] excludePrefixes) 
throws IOException {
+
+        readAllResultLines(target, resultPath, excludePrefixes, false);
+    }
+
+    public static void readAllResultLines(
+            List<String> target,
+            String resultPath,
+            String[] excludePrefixes,
+            boolean inOrderOfFiles)
+            throws IOException {
+
+        checkArgument(resultPath != null, "resultPath cannot be be null");
+
+        final BufferedReader[] readers =
+                getResultReader(resultPath, excludePrefixes, inOrderOfFiles);
+        try {
+            for (BufferedReader reader : readers) {
+                String s;
+                while ((s = reader.readLine()) != null) {
+                    target.add(s);
+                }
+            }
+        } finally {
+            for (BufferedReader reader : readers) {
+                org.apache.flink.util.IOUtils.closeQuietly(reader);
+            }
+        }
+    }
+
+    public static BufferedReader[] getResultReader(
+            String resultPath, String[] excludePrefixes, boolean 
inOrderOfFiles)
+            throws IOException {
+
+        File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
+
+        if (inOrderOfFiles) {
+            // sort the files after their name (1, 2, 3, 4)...
+            // we cannot sort by path, because strings sort by prefix
+            Arrays.sort(
+                    files,
+                    new Comparator<File>() {
+
+                        @Override
+                        public int compare(File o1, File o2) {
+                            try {
+                                int f1 = Integer.parseInt(o1.getName());
+                                int f2 = Integer.parseInt(o2.getName());
+                                return f1 < f2 ? -1 : (f1 > f2 ? 1 : 0);
+                            } catch (NumberFormatException e) {
+                                throw new RuntimeException(
+                                        "The file names are no numbers and 
cannot be ordered: "
+                                                + o1.getName()
+                                                + "/"
+                                                + o2.getName());
+                            }
+                        }
+                    });
+        }
+
+        BufferedReader[] readers = new BufferedReader[files.length];
+        for (int i = 0; i < files.length; i++) {
+            readers[i] =
+                    new BufferedReader(
+                            new InputStreamReader(
+                                    new FileInputStream(files[i]), 
StandardCharsets.UTF_8));
+        }
+        return readers;
+    }
+
+    private static File[] getAllInvolvedFiles(String resultPath, final 
String[] excludePrefixes) {
+        final File result = asFile(resultPath);
+        assertTrue("Result file was not written", result.exists());
+
+        if (result.isDirectory()) {
+            try {
+                return Files.walk(result.toPath())
+                        .filter(Files::isRegularFile)
+                        .filter(
+                                path -> {
+                                    for (String prefix : excludePrefixes) {
+                                        if 
(path.getFileName().startsWith(prefix)) {
+                                            return false;
+                                        }
+                                    }
+
+                                    return true;
+                                })
+                        .map(Path::toFile)
+                        .filter(file -> !file.isHidden())
+                        .toArray(File[]::new);
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to retrieve result files");
+            }
+        } else {
+            return new File[] {result};
+        }
+    }
+
+    protected static File asFile(String path) {
+        try {
+            URI uri = new URI(path);
+            if (uri.getScheme().equals("file")) {
+                return new File(uri.getPath());
+            } else {
+                throw new IllegalArgumentException("This path does not denote 
a local file.");
+            }
+        } catch (URISyntaxException | NullPointerException e) {
+            throw new IllegalArgumentException(
+                    "This path does not describe a valid local file URI.", e);
+        }
+    }
+
 }
diff --git a/pom.xml b/pom.xml
index 8bfd3ee..9d5ae98 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,7 +113,7 @@
         <maven.compiler.target>${java.version}</maven.compiler.target>
 
         <!-- Flink version -->
-        <flink.version>1.15.3</flink.version>
+        <flink.version>1.16.1</flink.version>
         <scala.binary.version>2.12</scala.binary.version>
         <scala.version>2.12.8</scala.version>
 

Reply via email to