This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 371fa748ce [hotfix] Pick one dir randomly in GlobalIndexAssigner
371fa748ce is described below
commit 371fa748ce7b03ab12012656c8e39e20062439bb
Author: JingsongLi <[email protected]>
AuthorDate: Tue May 13 23:13:04 2025 +0800
[hotfix] Pick one dir randomly in GlobalIndexAssigner
---
.../paimon/crosspartition/GlobalIndexAssigner.java | 13 ++----
.../java/org/apache/paimon/disk/IOManagerImpl.java | 48 +++++++++++++---------
2 files changed, 32 insertions(+), 29 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index fa9f0464ae..892e27e966 100644
---
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -72,6 +72,7 @@ import java.util.function.Function;
import java.util.stream.IntStream;
import static org.apache.paimon.lookup.RocksDBOptions.BLOCK_CACHE_SIZE;
+import static org.apache.paimon.utils.ListUtils.pickRandomly;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Assign UPDATE_BEFORE and bucket for the input record, output record with
bucket. */
@@ -132,15 +133,9 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
this.extractor = new
RowPartitionAllPrimaryKeyExtractor(table.schema());
this.keyPartExtractor = new
KeyPartPartitionKeyExtractor(table.schema());
- for (String tmpDir : ioManager.tempDirs()) {
- File rocksDBDir = new File(tmpDir, "rocksdb-" + UUID.randomUUID());
- if (rocksDBDir.mkdirs()) {
- this.path = rocksDBDir;
- break;
- }
- }
-
- if (path == null) {
+ String tmpDir = pickRandomly(Arrays.asList(ioManager.tempDirs()));
+ this.path = new File(tmpDir, "rocksdb-" + UUID.randomUUID());
+ if (!this.path.mkdirs()) {
throw new RuntimeException(
"Failed to create RocksDB cache directory in temp dirs: "
+ Arrays.toString(ioManager.tempDirs()));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
index a67dcfb8c4..c0858dbf28 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
@@ -41,7 +41,7 @@ public class IOManagerImpl implements IOManager {
private final String[] tempDirs;
- private volatile FileChannelManager lazyFileChannelManager;
+ private volatile FileChannelManager lazyChannelManager;
//
-------------------------------------------------------------------------
// Constructors / Destructors
@@ -58,35 +58,43 @@ public class IOManagerImpl implements IOManager {
}
private FileChannelManager fileChannelManager() {
- if (lazyFileChannelManager == null) {
+ if (lazyChannelManager == null) {
synchronized (this) {
- if (lazyFileChannelManager == null) {
- this.lazyFileChannelManager =
- new FileChannelManagerImpl(tempDirs,
DIR_NAME_PREFIX);
- if (LOG.isInfoEnabled()) {
- LOG.info(
- "Created a new {} for spilling of task related
data to disk (joins, sorting, ...). Used directories:\n\t{}",
- FileChannelManager.class.getSimpleName(),
- getSpillingDirectoriesPathsString());
- }
+ if (lazyChannelManager == null) {
+ lazyChannelManager = createFileChannelManager();
}
}
}
- return lazyFileChannelManager;
+ return lazyChannelManager;
}
/** Removes all temporary files. */
@Override
public void close() throws Exception {
- if (lazyFileChannelManager != null) {
- lazyFileChannelManager.close();
- if (LOG.isInfoEnabled()) {
- LOG.info(
- "Closed {} with directories:\n\t{}",
- FileChannelManager.class.getSimpleName(),
- getSpillingDirectoriesPathsString());
- }
+ if (lazyChannelManager != null) {
+ closeFileChannelManager(lazyChannelManager);
+ }
+ }
+
+ private FileChannelManager createFileChannelManager() {
+ FileChannelManager channelManager = new
FileChannelManagerImpl(tempDirs, DIR_NAME_PREFIX);
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Created a new {} for spilling of task related data to
disk (joins, sorting, ...). Used directories:\n\t{}",
+ FileChannelManager.class.getSimpleName(),
+ getSpillingDirectoriesPathsString());
+ }
+ return channelManager;
+ }
+
+ private void closeFileChannelManager(FileChannelManager
fileChannelManager) throws Exception {
+ fileChannelManager.close();
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Closed {} with directories:\n\t{}",
+ FileChannelManager.class.getSimpleName(),
+ getSpillingDirectoriesPathsString());
}
}