This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new b3780481 [ISSUES #483] Fix worker source task commit offset
FileNotFoundException
b3780481 is described below
commit b3780481a54cf9a6c7a80c03bf9f1fd4dc70732e
Author: zhoubo <[email protected]>
AuthorDate: Fri Apr 21 14:23:57 2023 +0800
[ISSUES #483] Fix worker source task commit offset FileNotFoundException
---
.../connect/runtime/utils/FileAndPropertyUtil.java | 25 ++++++++-------
.../runtime/utils/FileAndPropertyUtilTest.java | 37 ++++++++++++++++++++++
2 files changed, 50 insertions(+), 12 deletions(-)
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtil.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtil.java
index 69ac1b22..d83bf53b 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtil.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtil.java
@@ -37,21 +37,22 @@ public class FileAndPropertyUtil {
* @throws IOException
*/
public static void string2File(final String str, final String fileName)
throws IOException {
+ synchronized (fileName) {
+ String tmpFile = fileName + ".tmp";
+ string2FileNotSafe(str, tmpFile);
+
+ String bakFile = fileName + ".bak";
+ String prevContent = file2String(fileName);
+ if (prevContent != null) {
+ string2FileNotSafe(prevContent, bakFile);
+ }
- String tmpFile = fileName + ".tmp";
- string2FileNotSafe(str, tmpFile);
+ File file = new File(fileName);
+ file.delete();
- String bakFile = fileName + ".bak";
- String prevContent = file2String(fileName);
- if (prevContent != null) {
- string2FileNotSafe(prevContent, bakFile);
+ file = new File(tmpFile);
+ file.renameTo(new File(fileName));
}
-
- File file = new File(fileName);
- file.delete();
-
- file = new File(tmpFile);
- file.renameTo(new File(fileName));
}
public static void string2FileNotSafe(final String str, final String
fileName) throws IOException {
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtilTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtilTest.java
index 50cec64e..58e352de 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtilTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/utils/FileAndPropertyUtilTest.java
@@ -21,7 +21,13 @@ import org.junit.After;
import org.junit.Test;
import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
@@ -44,6 +50,37 @@ public class FileAndPropertyUtilTest {
assertEquals(str, s);
}
+ @Test
+ public void testMultiThreadString2File2String() {
+ CountDownLatch countDownLatch = new CountDownLatch(100);
+ List<Thread> threadList = new ArrayList<>();
+ AtomicInteger atomicInteger = new AtomicInteger(0);
+ for (int i = 0; i < 100; i++) {
+ final int n = i;
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ String str1 = String.valueOf(n);
+ FileAndPropertyUtil.string2File(str1, filePath);
+ } catch (IOException e) {
+ atomicInteger.getAndIncrement();
+ throw new RuntimeException(e);
+ }
+ countDownLatch.countDown();
+ }
+ });
+ threadList.add(thread);
+ }
+ threadList.forEach(t -> t.start());
+ try {
+ countDownLatch.await(3, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ assertEquals(atomicInteger.get(), 0);
+ }
+
@Test
public void testString2FileNotSafe() throws Exception {
FileAndPropertyUtil.string2FileNotSafe(str, filePath);