This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a79f2e2f1b4 [FLINK-37031][state/forst] Bump forstjni to 0.1.5 && make ForStFlinkFileSystem thread safe" (#25927) a79f2e2f1b4 is described below commit a79f2e2f1b434d050cb1b2d75a909b6a313d9e07 Author: Yanfei Lei <fredia...@gmail.com> AuthorDate: Thu Jan 9 22:44:04 2025 +0800 [FLINK-37031][state/forst] Bump forstjni to 0.1.5 && make ForStFlinkFileSystem thread safe" (#25927) --- flink-dist/src/main/resources/META-INF/NOTICE | 2 +- .../flink-statebackend-forst/pom.xml | 2 +- .../flink/state/forst/ForStResourceContainer.java | 24 +++++----------- .../flink/state/forst/ForStStateDataTransfer.java | 7 +++-- .../flink/state/forst/fs/ForStFlinkFileSystem.java | 33 +++++++++++----------- .../forst/fs/filemapping/FileMappingManager.java | 15 +++------- .../restore/ForStIncrementalRestoreOperation.java | 5 ++-- .../state/forst/ForStStateBackendConfigTest.java | 3 +- .../state/forst/fs/FileMappingManagerTest.java | 15 ++++++++-- 9 files changed, 52 insertions(+), 54 deletions(-) diff --git a/flink-dist/src/main/resources/META-INF/NOTICE b/flink-dist/src/main/resources/META-INF/NOTICE index 4007febfb53..2d28859adaa 100644 --- a/flink-dist/src/main/resources/META-INF/NOTICE +++ b/flink-dist/src/main/resources/META-INF/NOTICE @@ -9,7 +9,7 @@ This project bundles the following dependencies under the Apache Software Licens - com.google.code.findbugs:jsr305:1.3.9 - com.twitter:chill-java:0.7.6 - com.ververica:frocksdbjni:8.10.0-ververica-beta-1.0 -- com.ververica:forstjni:0.1.4-beta +- com.ververica:forstjni:0.1.5 - commons-cli:commons-cli:1.5.0 - commons-collections:commons-collections:3.2.2 - commons-io:commons-io:2.15.1 diff --git a/flink-state-backends/flink-statebackend-forst/pom.xml b/flink-state-backends/flink-statebackend-forst/pom.xml index f6073c889e4..d3e5740fdef 100644 --- a/flink-state-backends/flink-statebackend-forst/pom.xml +++ b/flink-state-backends/flink-statebackend-forst/pom.xml @@ -63,7 +63,7 @@ under the License. <dependency> <groupId>com.ververica</groupId> <artifactId>forstjni</artifactId> - <version>0.1.4-beta</version> + <version>0.1.5</version> </dependency> <!-- test dependencies --> diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java index 08ee1a7f7fa..3d025d5ebdd 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java @@ -78,8 +78,7 @@ public final class ForStResourceContainer implements AutoCloseable { // the filename length limit is 255 on most operating systems // In rocksdb, if db_log_dir is non empty, the log files will be in the specified dir, // and the db data dir's absolute path will be used as the log file name's prefix. - private static final int INSTANCE_PATH_LENGTH_LIMIT = - 255 / 2 - FORST_RELOCATE_LOG_SUFFIX.length(); + private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - FORST_RELOCATE_LOG_SUFFIX.length(); @Nullable private FlinkEnv flinkEnv = null; @@ -396,9 +395,8 @@ public final class ForStResourceContainer implements AutoCloseable { } } - private void clearDirectories(Path basePath) throws IOException { - FileSystem fileSystem = - forStFileSystem != null ? forStFileSystem : basePath.getFileSystem(); + private static void clearDirectories(Path basePath) throws IOException { + FileSystem fileSystem = basePath.getFileSystem(); if (fileSystem.exists(basePath)) { fileSystem.delete(basePath, true); } @@ -485,19 +483,11 @@ public final class ForStResourceContainer implements AutoCloseable { String logDir = internalGetOption(ForStConfigurableOptions.LOG_DIR); if (logDir == null || logDir.isEmpty()) { - if (localForStPath == null - || localForStPath.getPath().length() <= INSTANCE_PATH_LENGTH_LIMIT) { + // only relocate db log dir in local mode + if (remoteForStPath == null + && localForStPath != null + && localForStPath.getPath().length() <= INSTANCE_PATH_LENGTH_LIMIT) { relocateDefaultDbLogDir(currentOptions); - } else if (remoteForStPath != null) { // log must put in local - Path relocatedPath = localForStPath.getParent().getParent(); - LOG.warn("ForSt remote path is not null, relocate log in {}.", relocatedPath); - currentOptions.setDbLogDir(relocatedPath.toString()); - } else { - // disable log relocate when instance path length exceeds limit to prevent ForSt - // log file creation failure, details in FLINK-31743 - LOG.warn( - "ForSt local path length exceeds limit : {}, disable log relocate.", - localForStPath); } } else { currentOptions.setDbLogDir(logDir); diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateDataTransfer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateDataTransfer.java index a0877f5bb80..5a26776bc63 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateDataTransfer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateDataTransfer.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.state.forst.fs.ForStFlinkFileSystem; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; @@ -38,6 +39,8 @@ import org.apache.flink.util.function.ThrowingRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -66,13 +69,13 @@ public class ForStStateDataTransfer implements Closeable { protected final ExecutorService executorService; - private final FileSystem forStFs; + @Nullable private final ForStFlinkFileSystem forStFs; public ForStStateDataTransfer(int threadNum) { this(threadNum, null); } - public ForStStateDataTransfer(int threadNum, FileSystem forStFs) { + public ForStStateDataTransfer(int threadNum, ForStFlinkFileSystem forStFs) { this.forStFs = forStFs; if (threadNum > 1) { executorService = diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java index 433f5f15c27..f7c0ec0fc3e 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java @@ -136,8 +136,8 @@ public class ForStFlinkFileSystem extends FileSystem { } @Override - public ByteBufferWritableFSDataOutputStream create(Path path, WriteMode overwriteMode) - throws IOException { + public synchronized ByteBufferWritableFSDataOutputStream create( + Path path, WriteMode overwriteMode) throws IOException { FileMappingManager.RealPath realPath = fileMappingManager.createFile(path); if (realPath.isLocal) { return new ByteBufferWritableFSDataOutputStream( @@ -152,7 +152,8 @@ public class ForStFlinkFileSystem extends FileSystem { } @Override - public ByteBufferReadableFSDataInputStream open(Path path, int bufferSize) throws IOException { + public synchronized ByteBufferReadableFSDataInputStream open(Path path, int bufferSize) + throws IOException { FileMappingManager.RealPath realPath = fileMappingManager.realPath(path); Preconditions.checkNotNull(realPath); if (realPath.isLocal) { @@ -176,7 +177,7 @@ public class ForStFlinkFileSystem extends FileSystem { } @Override - public ByteBufferReadableFSDataInputStream open(Path path) throws IOException { + public synchronized ByteBufferReadableFSDataInputStream open(Path path) throws IOException { FileMappingManager.RealPath realPath = fileMappingManager.realPath(path); Preconditions.checkNotNull(realPath); if (realPath.isLocal) { @@ -200,27 +201,27 @@ public class ForStFlinkFileSystem extends FileSystem { } @Override - public boolean rename(Path src, Path dst) throws IOException { + public synchronized boolean rename(Path src, Path dst) throws IOException { return fileMappingManager.renameFile(src.toString(), dst.toString()); } @Override - public Path getWorkingDirectory() { + public synchronized Path getWorkingDirectory() { return delegateFS.getWorkingDirectory(); } @Override - public Path getHomeDirectory() { + public synchronized Path getHomeDirectory() { return delegateFS.getHomeDirectory(); } @Override - public URI getUri() { + public synchronized URI getUri() { return delegateFS.getUri(); } @Override - public boolean exists(final Path f) throws IOException { + public synchronized boolean exists(final Path f) throws IOException { FileMappingManager.RealPath realPath = fileMappingManager.realPath(f); if (realPath == null) { return delegateFS.exists(f) && delegateFS.getFileStatus(f).isDir(); @@ -239,7 +240,7 @@ public class ForStFlinkFileSystem extends FileSystem { } @Override - public FileStatus getFileStatus(Path path) throws IOException { + public synchronized FileStatus getFileStatus(Path path) throws IOException { FileMappingManager.RealPath realPath = fileMappingManager.realPath(path); Preconditions.checkNotNull(realPath); if (realPath.isLocal) { @@ -249,7 +250,7 @@ public class ForStFlinkFileSystem extends FileSystem { } @Override - public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) + public synchronized BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { Path path = file.getPath(); FileMappingManager.RealPath realPath = fileMappingManager.realPath(path); @@ -262,7 +263,7 @@ public class ForStFlinkFileSystem extends FileSystem { } @Override - public FileStatus[] listStatus(Path path) throws IOException { + public synchronized FileStatus[] listStatus(Path path) throws IOException { // mapping files List<FileStatus> fileStatuses = new ArrayList<>(); String pathStr = path.toString(); @@ -281,7 +282,7 @@ public class ForStFlinkFileSystem extends FileSystem { } @Override - public boolean delete(Path path, boolean recursive) throws IOException { + public synchronized boolean delete(Path path, boolean recursive) throws IOException { boolean success = fileMappingManager.deleteFile(path, recursive); if (fileBasedCache != null) { // only new generated file will put into cache, no need to consider file mapping @@ -291,16 +292,16 @@ public class ForStFlinkFileSystem extends FileSystem { } @Override - public boolean mkdirs(Path path) throws IOException { + public synchronized boolean mkdirs(Path path) throws IOException { return delegateFS.mkdirs(path); } @Override - public boolean isDistributedFS() { + public synchronized boolean isDistributedFS() { return delegateFS.isDistributedFS(); } - public int link(Path src, Path dst) throws IOException { + public synchronized int link(Path src, Path dst) throws IOException { return fileMappingManager.link(src.toString(), dst.toString()); } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java index d935a5e4d08..b54e4e0844d 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java @@ -70,7 +70,7 @@ public class FileMappingManager { public RealPath createFile(Path file) { String fileName = file.toString(); Preconditions.checkState(!mappingTable.containsKey(fileName)); - if (!fileName.endsWith(SST_SUFFIX) && fileName.startsWith(remoteBase)) { + if (!fileName.endsWith(SST_SUFFIX) && isParentDir(fileName, remoteBase)) { Path localFile = new Path(localBase, file.getName()); mappingTable.put( fileName, @@ -92,16 +92,9 @@ public class FileMappingManager { return -1; } MappingEntry sourceEntry = mappingTable.get(src); - if (sourceEntry != null) { - sourceEntry.retain(); - mappingTable.putIfAbsent(dst, sourceEntry); - } else { - sourceEntry = new MappingEntry(0, fileSystem, src, false, false); - sourceEntry.retain(); - mappingTable.put(src, sourceEntry); - sourceEntry.retain(); - mappingTable.put(dst, sourceEntry); - } + Preconditions.checkNotNull(sourceEntry); + sourceEntry.retain(); + mappingTable.putIfAbsent(dst, sourceEntry); LOG.trace("link: {} -> {}", dst, src); return 0; } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java index fc9a1051df1..ab8d76541da 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java @@ -253,9 +253,10 @@ public class ForStIncrementalRestoreOperation<K> implements ForStRestoreOperatio } private void transferAllStateHandles(List<StateHandleTransferSpec> specs) throws Exception { - FileSystem forStFs = getFileSystem(optionsContainer.getBasePath()); try (ForStStateDataTransfer transfer = - new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM, forStFs)) { + new ForStStateDataTransfer( + ForStStateDataTransfer.DEFAULT_THREAD_NUM, + optionsContainer.getFileSystem())) { transfer.transferAllStateDataToDirectory(specs, cancelStreamRegistry); } } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java index b7b357f2ba7..f106fef31fe 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java @@ -93,7 +93,8 @@ public class ForStStateBackendConfigTest { final File logFile = File.createTempFile(getClass().getSimpleName() + "-", ".log"); // set the environment variable 'log.file' with the Flink log file location System.setProperty("log.file", logFile.getPath()); - try (ForStResourceContainer container = backend.createOptionsAndResourceContainer(null)) { + try (ForStResourceContainer container = + backend.createOptionsAndResourceContainer(new Path(tempFolder.toString()))) { assertEquals( ForStConfigurableOptions.LOG_LEVEL.defaultValue(), container.getDbOptions().infoLogLevel()); diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java index 081e12d690c..77b24a0861b 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java @@ -43,6 +43,7 @@ public class FileMappingManagerTest { FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE); os.write(233); os.close(); + fileMappingManager.createFile(new Path(src)); String dst = tempDir.toString() + "/dst"; fileMappingManager.link(src, dst); assertThat(fileMappingManager.realPath(new Path(dst)).path.toString()).isEqualTo(src); @@ -60,6 +61,7 @@ public class FileMappingManagerTest { FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE); os.write(233); os.close(); + fileMappingManager.createFile(new Path(src)); String dstB = tempDir.toString() + "/b"; fileMappingManager.link(src, dstB); assertThat(fileMappingManager.realPath(new Path(dstB)).path.toString()).isEqualTo(src); @@ -87,6 +89,7 @@ public class FileMappingManagerTest { FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE); os.write(233); os.close(); + fileMappingManager.createFile(new Path(src)); String dst = tempDir.toString() + "/dst"; fileMappingManager.link(src, dst); // delete src @@ -102,13 +105,15 @@ public class FileMappingManagerTest { void testDirectoryDelete() throws IOException { FileSystem localFS = FileSystem.getLocalFileSystem(); FileMappingManager fileMappingManager = - new FileMappingManager(localFS, localFS, tempDir.toString(), tempDir.toString()); + new FileMappingManager( + localFS, localFS, tempDir.toString() + "/db", tempDir.toString() + "/db"); String testDir = tempDir + "/testDir"; localFS.mkdirs(new Path(testDir)); String src = testDir + "/source"; FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE); os.write(233); os.close(); + fileMappingManager.createFile(new Path(src)); String dst = tempDir.toString() + "/dst"; fileMappingManager.link(src, dst); @@ -127,13 +132,15 @@ public class FileMappingManagerTest { void testDirectoryRename() throws IOException { FileSystem localFS = FileSystem.getLocalFileSystem(); FileMappingManager fileMappingManager = - new FileMappingManager(localFS, localFS, tempDir.toString(), tempDir.toString()); + new FileMappingManager( + localFS, localFS, tempDir.toString() + "/db", tempDir.toString() + "/db"); String testDir = tempDir + "/testDir"; localFS.mkdirs(new Path(testDir)); String src = testDir + "/source"; FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE); os.write(233); os.close(); + fileMappingManager.createFile(new Path(src)); String linkedDirTmp = tempDir.toString() + "/linkedDir.tmp"; localFS.mkdirs(new Path(linkedDirTmp)); @@ -175,13 +182,15 @@ public class FileMappingManagerTest { void testCreateFileBeforeRename() throws IOException { FileSystem localFS = FileSystem.getLocalFileSystem(); FileMappingManager fileMappingManager = - new FileMappingManager(localFS, localFS, tempDir.toString(), tempDir.toString()); + new FileMappingManager( + localFS, localFS, tempDir.toString() + "/db", tempDir.toString() + "/db"); String testDir = tempDir + "/testDir"; localFS.mkdirs(new Path(testDir)); String src = testDir + "/source"; FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE); os.write(233); os.close(); + fileMappingManager.createFile(new Path(src)); String linkedDirTmp = tempDir.toString() + "/linkedDir.tmp"; localFS.mkdirs(new Path(linkedDirTmp));