This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 354fc4b44a [Fix] Fix `FileUtils::createNewFile` not create new file
(#5943)
354fc4b44a is described below
commit 354fc4b44a2816eaa480fb8123b62e476abedbcc
Author: Jia Fan <[email protected]>
AuthorDate: Tue Dec 12 15:32:12 2023 +0800
[Fix] Fix `FileUtils::createNewFile` not create new file (#5943)
---
.../apache/seatunnel/common/utils/FileUtils.java | 3 +-
.../seatunnel/common/utils/FileUtilsTest.java | 25 +++++++++++++++++
.../engine/e2e/ClusterFaultToleranceIT.java | 32 ++++++++--------------
.../e2e/ClusterFaultToleranceTwoPipelineIT.java | 25 +++++++----------
.../org/apache/seatunnel/engine/e2e/TestUtils.java | 4 ++-
.../apache/seatunnel/engine/e2e/TextHeaderIT.java | 8 +++---
6 files changed, 56 insertions(+), 41 deletions(-)
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
index f606032380..820cdf6bde 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
@@ -101,7 +101,7 @@ public class FileUtils {
*
* @param filePath filePath
*/
- public static void createNewFile(String filePath) {
+ public static void createNewFile(String filePath) throws IOException {
File file = new File(filePath);
if (file.exists()) {
file.delete();
@@ -110,6 +110,7 @@ public class FileUtils {
if (!file.getParentFile().exists()) {
createParentFile(file);
}
+ file.createNewFile();
}
/**
diff --git
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/FileUtilsTest.java
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/FileUtilsTest.java
index 8770ef6230..c773678a69 100644
---
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/FileUtilsTest.java
+++
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/FileUtilsTest.java
@@ -26,6 +26,8 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
public class FileUtilsTest {
@Test
@@ -78,4 +80,27 @@ public class FileUtilsTest {
}
}
}
+
+ @Test
+ public void createNewFile() throws IOException {
+ // create new file
+ FileUtils.createNewFile("/tmp/test.txt");
+ Assertions.assertEquals("",
FileUtils.readFileToStr(Paths.get("/tmp/test.txt")));
+
+ // delete exist file and create new file
+ FileUtils.writeStringToFile("/tmp/test2.txt", "test");
+ Path test2 = Paths.get("/tmp/test2.txt");
+ Assertions.assertEquals("test", FileUtils.readFileToStr(test2).trim());
+ FileUtils.createNewFile("/tmp/test2.txt");
+ Assertions.assertEquals("", FileUtils.readFileToStr(test2));
+
+ // create new file with not exist folder
+ FileUtils.createNewFile("/tmp/newfolder/test.txt");
+ Assertions.assertEquals("",
FileUtils.readFileToStr(Paths.get("/tmp/newfolder/test.txt")));
+
+
FileUtils.createNewFile("/tmp/newfolder/newfolder2/newfolde3/test.txt");
+ Assertions.assertEquals(
+ "",
+
FileUtils.readFileToStr(Paths.get("/tmp/newfolder/newfolder2/newfolde3/test.txt")));
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index f2f5d5c5c8..747dee9478 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -44,10 +44,10 @@ import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
@@ -69,7 +69,7 @@ public class ClusterFaultToleranceIT {
public static final String DYNAMIC_TEST_PARALLELISM =
"dynamic_test_parallelism";
@Test
- public void testBatchJobRunOkIn2Node() throws ExecutionException,
InterruptedException {
+ public void testBatchJobRunOkIn2Node() throws Exception {
String testCaseName = "testBatchJobRunOkIn2Node";
String testClusterName =
"ClusterFaultToleranceIT_testBatchJobRunOkIn2Node";
long testRowNumber = 1000;
@@ -161,10 +161,8 @@ public class ClusterFaultToleranceIT {
* @param parallelism FakeSource parallelism
*/
private ImmutablePair<String, String> createTestResources(
- @NonNull String testCaseName,
- @NonNull JobMode jobMode,
- long rowNumber,
- int parallelism) {
+ @NonNull String testCaseName, @NonNull JobMode jobMode, long
rowNumber, int parallelism)
+ throws IOException {
checkArgument(rowNumber > 0, "rowNumber must greater than 0");
checkArgument(parallelism > 0, "parallelism must greater than 0");
Map<String, String> valueMap = new HashMap<>();
@@ -194,7 +192,7 @@ public class ClusterFaultToleranceIT {
}
@Test
- public void testStreamJobRunOkIn2Node() throws ExecutionException,
InterruptedException {
+ public void testStreamJobRunOkIn2Node() throws Exception {
String testCaseName = "testStreamJobRunOkIn2Node";
String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRunOkIn2Node";
long testRowNumber = 1000;
@@ -287,8 +285,7 @@ public class ClusterFaultToleranceIT {
}
@Test
- public void testBatchJobRestoreIn2NodeWorkerDown()
- throws ExecutionException, InterruptedException {
+ public void testBatchJobRestoreIn2NodeWorkerDown() throws Exception {
String testCaseName = "testBatchJobRestoreIn2NodeWorkerDown";
String testClusterName =
"ClusterFaultToleranceIT_testBatchJobRestoreIn2NodeWorkerDown";
long testRowNumber = 1000;
@@ -387,8 +384,7 @@ public class ClusterFaultToleranceIT {
}
@Test
- public void testStreamJobRestoreIn2NodeWorkerDown()
- throws ExecutionException, InterruptedException {
+ public void testStreamJobRestoreIn2NodeWorkerDown() throws Exception {
String testCaseName = "testStreamJobRestoreIn2NodeWorkerDown";
String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRestoreIn2NodeWorkerDown";
long testRowNumber = 1000;
@@ -506,8 +502,7 @@ public class ClusterFaultToleranceIT {
}
@Test
- public void testBatchJobRestoreIn2NodeMasterDown()
- throws ExecutionException, InterruptedException {
+ public void testBatchJobRestoreIn2NodeMasterDown() throws Exception {
String testCaseName = "testBatchJobRestoreIn2NodeMasterDown";
String testClusterName =
"ClusterFaultToleranceIT_testBatchJobRestoreIn2NodeMasterDown";
long testRowNumber = 1000;
@@ -609,8 +604,7 @@ public class ClusterFaultToleranceIT {
}
@Test
- public void testStreamJobRestoreIn2NodeMasterDown()
- throws ExecutionException, InterruptedException {
+ public void testStreamJobRestoreIn2NodeMasterDown() throws Exception {
String testCaseName = "testStreamJobRestoreIn2NodeMasterDown";
String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRestoreIn2NodeMasterDown";
long testRowNumber = 1000;
@@ -730,15 +724,14 @@ public class ClusterFaultToleranceIT {
@Test
@Disabled
- public void testFor() throws ExecutionException, InterruptedException {
+ public void testFor() throws Exception {
for (int i = 0; i < 200; i++) {
testStreamJobRestoreInAllNodeDown();
}
}
@Test
- public void testStreamJobRestoreInAllNodeDown()
- throws ExecutionException, InterruptedException {
+ public void testStreamJobRestoreInAllNodeDown() throws Exception {
String testCaseName = "testStreamJobRestoreInAllNodeDown";
String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRestoreInAllNodeDown_"
@@ -938,8 +931,7 @@ public class ClusterFaultToleranceIT {
@Test
@Disabled
- public void testStreamJobRestoreFromOssInAllNodeDown()
- throws ExecutionException, InterruptedException {
+ public void testStreamJobRestoreFromOssInAllNodeDown() throws Exception {
String OSS_BUCKET_NAME = "oss://your bucket name/";
String OSS_ENDPOINT = "your oss endpoint";
String OSS_ACCESS_KEY_ID = "oss accessKey id";
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
index 8ac38aed02..d9b8ddcedf 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
@@ -42,10 +42,10 @@ import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
@@ -70,8 +70,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
public static final String DYNAMIC_TEST_PARALLELISM =
"dynamic_test_parallelism";
@Test
- public void testTwoPipelineBatchJobRunOkIn2Node()
- throws ExecutionException, InterruptedException {
+ public void testTwoPipelineBatchJobRunOkIn2Node() throws Exception {
String testCaseName = "testTwoPipelineBatchJobRunOkIn2Node";
String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRunOkIn2Node";
@@ -171,7 +170,8 @@ public class ClusterFaultToleranceTwoPipelineIT {
@NonNull JobMode jobMode,
long rowNumber,
int parallelism,
- @NonNull String templateFileName) {
+ @NonNull String templateFileName)
+ throws IOException {
checkArgument(rowNumber > 0, "rowNumber must greater than 0");
checkArgument(parallelism > 0, "parallelism must greater than 0");
Map<String, String> valueMap = new HashMap<>();
@@ -201,8 +201,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
}
@Test
- public void testTwoPipelineStreamJobRunOkIn2Node()
- throws ExecutionException, InterruptedException {
+ public void testTwoPipelineStreamJobRunOkIn2Node() throws Exception {
String testCaseName = "testTwoPipelineStreamJobRunOkIn2Node";
String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRunOkIn2Node";
@@ -300,8 +299,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
}
@Test
- public void testTwoPipelineBatchJobRestoreIn2NodeWorkerDown()
- throws ExecutionException, InterruptedException {
+ public void testTwoPipelineBatchJobRestoreIn2NodeWorkerDown() throws
Exception {
String testCaseName =
"testTwoPipelineBatchJobRestoreIn2NodeWorkerDown";
String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeWorkerDown";
@@ -409,15 +407,14 @@ public class ClusterFaultToleranceTwoPipelineIT {
@Test
@Disabled
- public void testFor() throws ExecutionException, InterruptedException {
+ public void testFor() throws Exception {
for (int i = 0; i < 200; i++) {
testTwoPipelineStreamJobRestoreIn2NodeMasterDown();
}
}
@Test
- public void testTwoPipelineStreamJobRestoreIn2NodeWorkerDown()
- throws ExecutionException, InterruptedException {
+ public void testTwoPipelineStreamJobRestoreIn2NodeWorkerDown() throws
Exception {
String testCaseName =
"testTwoPipelineStreamJobRestoreIn2NodeWorkerDown";
String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeWorkerDown";
@@ -545,8 +542,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
}
@Test
- public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown()
- throws ExecutionException, InterruptedException {
+ public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown() throws
Exception {
String testCaseName =
"testTwoPipelineBatchJobRestoreIn2NodeMasterDown" +
System.currentTimeMillis();
String testClusterName =
@@ -656,8 +652,7 @@ public class ClusterFaultToleranceTwoPipelineIT {
}
@Test
- public void testTwoPipelineStreamJobRestoreIn2NodeMasterDown()
- throws ExecutionException, InterruptedException {
+ public void testTwoPipelineStreamJobRestoreIn2NodeMasterDown() throws
Exception {
String testCaseName =
"testTwoPipelineStreamJobRestoreIn2NodeMasterDown" +
System.currentTimeMillis();
String testClusterName =
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
index 1653241402..d88c7c5ee8 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
@@ -24,6 +24,7 @@ import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
+import java.io.IOException;
import java.nio.file.Paths;
import java.util.Map;
@@ -54,7 +55,8 @@ public class TestUtils {
public static void createTestConfigFileFromTemplate(
@NonNull String templateFile,
@NonNull Map<String, String> valueMap,
- @NonNull String targetFilePath) {
+ @NonNull String targetFilePath)
+ throws IOException {
String templateFilePath = getResource(templateFile);
String confContent =
FileUtils.readFileToStr(Paths.get(templateFilePath));
String targetConfContent = VariablesSubstitute.substitute(confContent,
valueMap);
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
index 8aa28599cb..09a0cc472a 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java
@@ -44,12 +44,12 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
@@ -92,14 +92,14 @@ public class TextHeaderIT {
try {
enableWriteHeader(
t.getFileStyle(), t.getEnableWriteHeader(),
t.getHeaderName());
- } catch (ExecutionException | InterruptedException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
});
}
public void enableWriteHeader(String file_format_type, String headerWrite,
String headerContent)
- throws ExecutionException, InterruptedException {
+ throws Exception {
String testClusterName =
"ClusterFaultToleranceIT_EnableWriteHeaderNode";
HazelcastInstanceImpl node1 = null;
SeaTunnelClient engineClient = null;
@@ -170,7 +170,7 @@ public class TextHeaderIT {
}
private ImmutablePair<String, String> createTestResources(
- @NonNull String headerWrite, @NonNull String formatType) {
+ @NonNull String headerWrite, @NonNull String formatType) throws
IOException {
Map<String, String> valueMap = new HashMap<>();
valueMap.put(ENABLE_HEADER_WRITE, headerWrite);
valueMap.put(FILE_FORMAT_TYPE, formatType);