This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 11d2470 [FLINK-24728][tests] Add tests to ensure SQL file sink closes
all created files
11d2470 is described below
commit 11d24708be32605243bc404679b17758c4e76e79
Author: tsreaper <[email protected]>
AuthorDate: Fri Dec 17 13:37:11 2021 +0800
[FLINK-24728][tests] Add tests to ensure SQL file sink closes all created
files
This closes #18073
---
.../org/apache/flink/testutils/TestFileSystem.java | 77 ++++++++++++++++++++--
.../planner/runtime/FileSystemITCaseBase.scala | 14 ++--
.../batch/sql/FileSystemTestCsvITCase.scala | 17 +++++
.../stream/sql/StreamFileSystemTestCsvITCase.scala | 17 +++++
4 files changed, 114 insertions(+), 11 deletions(-)
diff --git
a/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
b/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
index 8a343f0..f04837a 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
@@ -19,15 +19,21 @@
package org.apache.flink.testutils;
import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalDataOutputStream;
import org.apache.flink.core.fs.local.LocalFileStatus;
import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* A test file system. This also has a service entry in the test resources, to
be loaded during
@@ -37,29 +43,45 @@ public class TestFileSystem extends LocalFileSystem {
public static final String SCHEME = "test";
- private static int streamOpenCounter;
+ // number of (input) stream opened
+ private static final AtomicInteger streamOpenCounter = new
AtomicInteger(0);
+
+ // current number of created, unclosed (output) stream
+ private static final Map<Path, Integer> currentUnclosedOutputStream = new
ConcurrentHashMap<>();
public static int getNumtimeStreamOpened() {
- return streamOpenCounter;
+ return streamOpenCounter.get();
}
public static void resetStreamOpenCounter() {
- streamOpenCounter = 0;
+ streamOpenCounter.set(0);
+ }
+
+ public static int getNumberOfUnclosedOutputStream(Path path) {
+ return currentUnclosedOutputStream.getOrDefault(path, 0);
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
- streamOpenCounter++;
+ streamOpenCounter.incrementAndGet();
return super.open(f, bufferSize);
}
@Override
public FSDataInputStream open(Path f) throws IOException {
- streamOpenCounter++;
+ streamOpenCounter.incrementAndGet();
return super.open(f);
}
@Override
+ public FSDataOutputStream create(final Path filePath, final WriteMode
overwrite)
+ throws IOException {
+ currentUnclosedOutputStream.compute(filePath, (k, v) -> v == null ? 1
: v + 1);
+ LocalDataOutputStream stream = (LocalDataOutputStream)
super.create(filePath, overwrite);
+ return new TestOutputStream(stream, filePath);
+ }
+
+ @Override
public FileStatus getFileStatus(Path f) throws IOException {
LocalFileStatus status = (LocalFileStatus) super.getFileStatus(f);
return new LocalFileStatus(status.getFile(), this);
@@ -82,6 +104,51 @@ public class TestFileSystem extends LocalFileSystem {
// ------------------------------------------------------------------------
+ private static final class TestOutputStream extends FSDataOutputStream {
+
+ private final LocalDataOutputStream stream;
+ private final Path path;
+
+ private TestOutputStream(LocalDataOutputStream stream, Path path) {
+ this.stream = stream;
+ this.path = path;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return stream.getPos();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ stream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ stream.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ stream.flush();
+ }
+
+ @Override
+ public void sync() throws IOException {
+ stream.sync();
+ }
+
+ @Override
+ public void close() throws IOException {
+ currentUnclosedOutputStream.compute(
+ path, (k, v) -> Preconditions.checkNotNull(v) == 1 ? null
: v - 1);
+ stream.close();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
public static final class TestFileSystemFactory implements
FileSystemFactory {
@Override
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
index d80e5b4..32a08d3 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
@@ -49,6 +49,8 @@ trait FileSystemITCaseBase {
def formatProperties(): Array[String] = Array()
+ def getScheme: String = "file"
+
def tableEnv: TableEnvironment
def check(sqlQuery: String, expectedResult: Seq[Row]): Unit
@@ -59,7 +61,7 @@ trait FileSystemITCaseBase {
}
def open(): Unit = {
- resultPath = fileTmpFolder.newFolder().toURI.toString
+ resultPath = fileTmpFolder.newFolder().toURI.getPath
BatchTableEnvUtil.registerCollection(
tableEnv,
"originalT",
@@ -76,7 +78,7 @@ trait FileSystemITCaseBase {
| c as b + 1
|) partitioned by (a, b) with (
| 'connector' = 'filesystem',
- | 'path' = '$resultPath',
+ | 'path' = '$getScheme://$resultPath',
| ${formatProperties().mkString(",\n")}
|)
""".stripMargin
@@ -90,7 +92,7 @@ trait FileSystemITCaseBase {
| b bigint
|) with (
| 'connector' = 'filesystem',
- | 'path' = '$resultPath',
+ | 'path' = '$getScheme://$resultPath',
| ${formatProperties().mkString(",\n")}
|)
""".stripMargin
@@ -102,7 +104,7 @@ trait FileSystemITCaseBase {
| x decimal(10, 0), y int
|) with (
| 'connector' = 'filesystem',
- | 'path' = '$resultPath',
+ | 'path' = '$getScheme://$resultPath',
| ${formatProperties().mkString(",\n")}
|)
""".stripMargin
@@ -114,7 +116,7 @@ trait FileSystemITCaseBase {
| x decimal(3, 2), y int
|) with (
| 'connector' = 'filesystem',
- | 'path' = '$resultPath',
+ | 'path' = '$getScheme://$resultPath',
| ${formatProperties().mkString(",\n")}
|)
""".stripMargin
@@ -256,7 +258,7 @@ trait FileSystemITCaseBase {
"partition(a='1', b='1') select x, y from originalT where a=1 and
b=1").await()
// create hidden partition dir
- assertTrue(new File(new Path(resultPath + "/a=1/.b=2").toUri).mkdir())
+ assertTrue(new File(new Path("file:" + resultPath +
"/a=1/.b=2").toUri).mkdir())
check(
"select x, y from partitionedTable",
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala
index baf2cb7..0d768b6 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala
@@ -18,6 +18,12 @@
package org.apache.flink.table.planner.runtime.batch.sql
+import org.apache.flink.core.fs.Path
+import org.apache.flink.testutils.TestFileSystem
+
+import org.junit.After
+import org.junit.Assert.assertEquals
+
/**
* Test for file system table factory with testcsv format.
*/
@@ -26,4 +32,15 @@ class FileSystemTestCsvITCase extends
BatchFileSystemITCaseBase {
override def formatProperties(): Array[String] = {
super.formatProperties() ++ Seq("'format' = 'testcsv'")
}
+
+ override def getScheme: String = "test"
+
+ @After
+ def close(): Unit = {
+ val path = new Path(resultPath)
+ assertEquals(
+ s"File $resultPath is not closed",
+ 0,
+ TestFileSystem.getNumberOfUnclosedOutputStream(path))
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
index 4ead229..a9a96c1 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
@@ -18,6 +18,12 @@
package org.apache.flink.table.planner.runtime.stream.sql
+import org.apache.flink.core.fs.Path
+import org.apache.flink.testutils.TestFileSystem
+
+import org.junit.After
+import org.junit.Assert.assertEquals
+
import scala.collection.Seq
/**
@@ -28,4 +34,15 @@ class StreamFileSystemTestCsvITCase extends
StreamFileSystemITCaseBase {
override def formatProperties(): Array[String] = {
super.formatProperties() ++ Seq("'format' = 'testcsv'")
}
+
+ override def getScheme: String = "test"
+
+ @After
+ def close(): Unit = {
+ val path = new Path(resultPath)
+ assertEquals(
+ s"File $resultPath is not closed",
+ 0,
+ TestFileSystem.getNumberOfUnclosedOutputStream(path))
+ }
}