This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 62f3a47947 [core] lazily create FileChannelManagerImpl to avoid
creating unused dirs (#5591)
62f3a47947 is described below
commit 62f3a47947bd80c58e90057bef6cf50ed5a59a06
Author: Yujiang Zhong <[email protected]>
AuthorDate: Tue May 13 23:05:19 2025 +0800
[core] lazily create FileChannelManagerImpl to avoid creating unused dirs
(#5591)
---
.../paimon/crosspartition/GlobalIndexAssigner.java | 22 ++++++---
.../java/org/apache/paimon/disk/IOManagerImpl.java | 54 ++++++++++++++--------
2 files changed, 49 insertions(+), 27 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 5ed13b4fce..fa9f0464ae 100644
---
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -62,11 +62,11 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.IntStream;
@@ -132,14 +132,22 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
this.extractor = new
RowPartitionAllPrimaryKeyExtractor(table.schema());
this.keyPartExtractor = new
KeyPartPartitionKeyExtractor(table.schema());
+ for (String tmpDir : ioManager.tempDirs()) {
+ File rocksDBDir = new File(tmpDir, "rocksdb-" + UUID.randomUUID());
+ if (rocksDBDir.mkdirs()) {
+ this.path = rocksDBDir;
+ break;
+ }
+ }
+
+ if (path == null) {
+ throw new RuntimeException(
+ "Failed to create RocksDB cache directory in temp dirs: "
+ + Arrays.toString(ioManager.tempDirs()));
+ }
+
// state
Options options = coreOptions.toConfiguration();
- String rocksDBDir =
- ioManager
- .tempDirs()[
-
ThreadLocalRandom.current().nextInt(ioManager.tempDirs().length)];
- this.path = new File(rocksDBDir, "rocksdb-" + UUID.randomUUID());
-
Options rocksdbOptions = Options.fromMap(new
HashMap<>(options.toMap()));
// we should avoid too small memory
long blockCache = Math.max(offHeapMemory,
rocksdbOptions.get(BLOCK_CACHE_SIZE).getBytes());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
index 53f5090b37..a67dcfb8c4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
@@ -41,7 +41,7 @@ public class IOManagerImpl implements IOManager {
private final String[] tempDirs;
- private final FileChannelManager fileChannelManager;
+ private volatile FileChannelManager lazyFileChannelManager;
//
-------------------------------------------------------------------------
// Constructors / Destructors
@@ -53,37 +53,51 @@ public class IOManagerImpl implements IOManager {
* @param tempDirs The basic directories for files underlying anonymous
channels.
*/
public IOManagerImpl(String... tempDirs) {
+ Preconditions.checkNotNull(tempDirs);
this.tempDirs = tempDirs;
- this.fileChannelManager =
- new
FileChannelManagerImpl(Preconditions.checkNotNull(tempDirs), DIR_NAME_PREFIX);
- if (LOG.isInfoEnabled()) {
- LOG.info(
- "Created a new {} for spilling of task related data to
disk (joins, sorting, ...). Used directories:\n\t{}",
- FileChannelManager.class.getSimpleName(),
- getSpillingDirectoriesPathsString());
+ }
+
+ private FileChannelManager fileChannelManager() {
+ if (lazyFileChannelManager == null) {
+ synchronized (this) {
+ if (lazyFileChannelManager == null) {
+ this.lazyFileChannelManager =
+ new FileChannelManagerImpl(tempDirs,
DIR_NAME_PREFIX);
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Created a new {} for spilling of task related
data to disk (joins, sorting, ...). Used directories:\n\t{}",
+ FileChannelManager.class.getSimpleName(),
+ getSpillingDirectoriesPathsString());
+ }
+ }
+ }
}
+
+ return lazyFileChannelManager;
}
/** Removes all temporary files. */
@Override
public void close() throws Exception {
- fileChannelManager.close();
- if (LOG.isInfoEnabled()) {
- LOG.info(
- "Closed {} with directories:\n\t{}",
- FileChannelManager.class.getSimpleName(),
- getSpillingDirectoriesPathsString());
+ if (lazyFileChannelManager != null) {
+ lazyFileChannelManager.close();
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Closed {} with directories:\n\t{}",
+ FileChannelManager.class.getSimpleName(),
+ getSpillingDirectoriesPathsString());
+ }
}
}
@Override
public ID createChannel() {
- return fileChannelManager.createChannel();
+ return fileChannelManager().createChannel();
}
@Override
public ID createChannel(String prefix) {
- return fileChannelManager.createChannel(prefix);
+ return fileChannelManager().createChannel(prefix);
}
@Override
@@ -93,7 +107,7 @@ public class IOManagerImpl implements IOManager {
@Override
public Enumerator createChannelEnumerator() {
- return fileChannelManager.createChannelEnumerator();
+ return fileChannelManager().createChannelEnumerator();
}
/**
@@ -116,7 +130,7 @@ public class IOManagerImpl implements IOManager {
* @return The directories that the I/O manager spills to.
*/
public File[] getSpillingDirectories() {
- return fileChannelManager.getPaths();
+ return fileChannelManager().getPaths();
}
/**
@@ -125,7 +139,7 @@ public class IOManagerImpl implements IOManager {
* @return The directories that the I/O manager spills to, as path strings.
*/
public String[] getSpillingDirectoriesPaths() {
- File[] paths = fileChannelManager.getPaths();
+ File[] paths = fileChannelManager().getPaths();
String[] strings = new String[paths.length];
for (int i = 0; i < strings.length; i++) {
strings[i] = paths[i].getAbsolutePath();
@@ -134,7 +148,7 @@ public class IOManagerImpl implements IOManager {
}
private String getSpillingDirectoriesPathsString() {
- return Arrays.stream(fileChannelManager.getPaths())
+ return Arrays.stream(fileChannelManager().getPaths())
.map(File::getAbsolutePath)
.collect(Collectors.joining("\n\t"));
}