This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 11d2470  [FLINK-24728][tests] Add tests to ensure SQL file sink closes 
all created files
11d2470 is described below

commit 11d24708be32605243bc404679b17758c4e76e79
Author: tsreaper <[email protected]>
AuthorDate: Fri Dec 17 13:37:11 2021 +0800

    [FLINK-24728][tests] Add tests to ensure SQL file sink closes all created 
files
    
    This closes #18073
---
 .../org/apache/flink/testutils/TestFileSystem.java | 77 ++++++++++++++++++++--
 .../planner/runtime/FileSystemITCaseBase.scala     | 14 ++--
 .../batch/sql/FileSystemTestCsvITCase.scala        | 17 +++++
 .../stream/sql/StreamFileSystemTestCsvITCase.scala | 17 +++++
 4 files changed, 114 insertions(+), 11 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java 
b/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
index 8a343f0..f04837a 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
@@ -19,15 +19,21 @@
 package org.apache.flink.testutils;
 
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalDataOutputStream;
 import org.apache.flink.core.fs.local.LocalFileStatus;
 import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * A test file system. This also has a service entry in the test resources, to 
be loaded during
@@ -37,29 +43,45 @@ public class TestFileSystem extends LocalFileSystem {
 
     public static final String SCHEME = "test";
 
-    private static int streamOpenCounter;
+    // number of (input) stream opened
+    private static final AtomicInteger streamOpenCounter = new 
AtomicInteger(0);
+
+    // current number of created, unclosed (output) stream
+    private static final Map<Path, Integer> currentUnclosedOutputStream = new 
ConcurrentHashMap<>();
 
     public static int getNumtimeStreamOpened() {
-        return streamOpenCounter;
+        return streamOpenCounter.get();
     }
 
     public static void resetStreamOpenCounter() {
-        streamOpenCounter = 0;
+        streamOpenCounter.set(0);
+    }
+
+    public static int getNumberOfUnclosedOutputStream(Path path) {
+        return currentUnclosedOutputStream.getOrDefault(path, 0);
     }
 
     @Override
     public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-        streamOpenCounter++;
+        streamOpenCounter.incrementAndGet();
         return super.open(f, bufferSize);
     }
 
     @Override
     public FSDataInputStream open(Path f) throws IOException {
-        streamOpenCounter++;
+        streamOpenCounter.incrementAndGet();
         return super.open(f);
     }
 
     @Override
+    public FSDataOutputStream create(final Path filePath, final WriteMode 
overwrite)
+            throws IOException {
+        currentUnclosedOutputStream.compute(filePath, (k, v) -> v == null ? 1 
: v + 1);
+        LocalDataOutputStream stream = (LocalDataOutputStream) 
super.create(filePath, overwrite);
+        return new TestOutputStream(stream, filePath);
+    }
+
+    @Override
     public FileStatus getFileStatus(Path f) throws IOException {
         LocalFileStatus status = (LocalFileStatus) super.getFileStatus(f);
         return new LocalFileStatus(status.getFile(), this);
@@ -82,6 +104,51 @@ public class TestFileSystem extends LocalFileSystem {
 
     // ------------------------------------------------------------------------
 
+    private static final class TestOutputStream extends FSDataOutputStream {
+
+        private final LocalDataOutputStream stream;
+        private final Path path;
+
+        private TestOutputStream(LocalDataOutputStream stream, Path path) {
+            this.stream = stream;
+            this.path = path;
+        }
+
+        @Override
+        public long getPos() throws IOException {
+            return stream.getPos();
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            stream.write(b);
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException {
+            stream.write(b, off, len);
+        }
+
+        @Override
+        public void flush() throws IOException {
+            stream.flush();
+        }
+
+        @Override
+        public void sync() throws IOException {
+            stream.sync();
+        }
+
+        @Override
+        public void close() throws IOException {
+            currentUnclosedOutputStream.compute(
+                    path, (k, v) -> Preconditions.checkNotNull(v) == 1 ? null 
: v - 1);
+            stream.close();
+        }
+    }
+
+    // ------------------------------------------------------------------------
+
     public static final class TestFileSystemFactory implements 
FileSystemFactory {
 
         @Override
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
index d80e5b4..32a08d3 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
@@ -49,6 +49,8 @@ trait FileSystemITCaseBase {
 
   def formatProperties(): Array[String] = Array()
 
+  def getScheme: String = "file"
+
   def tableEnv: TableEnvironment
 
   def check(sqlQuery: String, expectedResult: Seq[Row]): Unit
@@ -59,7 +61,7 @@ trait FileSystemITCaseBase {
   }
 
   def open(): Unit = {
-    resultPath = fileTmpFolder.newFolder().toURI.toString
+    resultPath = fileTmpFolder.newFolder().toURI.getPath
     BatchTableEnvUtil.registerCollection(
       tableEnv,
       "originalT",
@@ -76,7 +78,7 @@ trait FileSystemITCaseBase {
          |  c as b + 1
          |) partitioned by (a, b) with (
          |  'connector' = 'filesystem',
-         |  'path' = '$resultPath',
+         |  'path' = '$getScheme://$resultPath',
          |  ${formatProperties().mkString(",\n")}
          |)
        """.stripMargin
@@ -90,7 +92,7 @@ trait FileSystemITCaseBase {
          |  b bigint
          |) with (
          |  'connector' = 'filesystem',
-         |  'path' = '$resultPath',
+         |  'path' = '$getScheme://$resultPath',
          |  ${formatProperties().mkString(",\n")}
          |)
        """.stripMargin
@@ -102,7 +104,7 @@ trait FileSystemITCaseBase {
          |  x decimal(10, 0), y int
          |) with (
          |  'connector' = 'filesystem',
-         |  'path' = '$resultPath',
+         |  'path' = '$getScheme://$resultPath',
          |  ${formatProperties().mkString(",\n")}
          |)
        """.stripMargin
@@ -114,7 +116,7 @@ trait FileSystemITCaseBase {
          |  x decimal(3, 2), y int
          |) with (
          |  'connector' = 'filesystem',
-         |  'path' = '$resultPath',
+         |  'path' = '$getScheme://$resultPath',
          |  ${formatProperties().mkString(",\n")}
          |)
        """.stripMargin
@@ -256,7 +258,7 @@ trait FileSystemITCaseBase {
       "partition(a='1', b='1') select x, y from originalT where a=1 and 
b=1").await()
 
     // create hidden partition dir
-    assertTrue(new File(new Path(resultPath + "/a=1/.b=2").toUri).mkdir())
+    assertTrue(new File(new Path("file:" + resultPath + 
"/a=1/.b=2").toUri).mkdir())
 
     check(
       "select x, y from partitionedTable",
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala
index baf2cb7..0d768b6 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala
@@ -18,6 +18,12 @@
 
 package org.apache.flink.table.planner.runtime.batch.sql
 
+import org.apache.flink.core.fs.Path
+import org.apache.flink.testutils.TestFileSystem
+
+import org.junit.After
+import org.junit.Assert.assertEquals
+
 /**
   * Test for file system table factory with testcsv format.
   */
@@ -26,4 +32,15 @@ class FileSystemTestCsvITCase extends 
BatchFileSystemITCaseBase {
   override def formatProperties(): Array[String] = {
     super.formatProperties() ++ Seq("'format' = 'testcsv'")
   }
+
+  override def getScheme: String = "test"
+
+  @After
+  def close(): Unit = {
+    val path = new Path(resultPath)
+    assertEquals(
+      s"File $resultPath is not closed",
+      0,
+      TestFileSystem.getNumberOfUnclosedOutputStream(path))
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
index 4ead229..a9a96c1 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
@@ -18,6 +18,12 @@
 
 package org.apache.flink.table.planner.runtime.stream.sql
 
+import org.apache.flink.core.fs.Path
+import org.apache.flink.testutils.TestFileSystem
+
+import org.junit.After
+import org.junit.Assert.assertEquals
+
 import scala.collection.Seq
 
 /**
@@ -28,4 +34,15 @@ class StreamFileSystemTestCsvITCase extends 
StreamFileSystemITCaseBase {
   override def formatProperties(): Array[String] = {
     super.formatProperties() ++ Seq("'format' = 'testcsv'")
   }
+
+  override def getScheme: String = "test"
+
+  @After
+  def close(): Unit = {
+    val path = new Path(resultPath)
+    assertEquals(
+      s"File $resultPath is not closed",
+      0,
+      TestFileSystem.getNumberOfUnclosedOutputStream(path))
+  }
 }

Reply via email to