This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 5882e101 [MINOR] Removed unused methods and variable (#702)
5882e101 is described below
commit 5882e10122ffd9b2d1bfb39630f32c643cf7ef0d
Author: jokercurry <[email protected]>
AuthorDate: Sat Mar 11 00:19:55 2023 +0800
[MINOR] Removed unused methods and variable (#702)
### What changes were proposed in this pull request?
1、removed unused methods and variable.
2、modify the return type of some methods
### Why are the changes needed?
Clean up code.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Origin uts.
---
.../server/storage/LocalStorageManager.java | 17 ++----
.../uniffle/storage/common/LocalStorage.java | 68 +---------------------
.../uniffle/storage/common/LocalStorageMeta.java | 46 ++++-----------
.../uniffle/storage/common/LocalStorageTest.java | 8 ---
4 files changed, 16 insertions(+), 123 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 53f1a7ef..a8403d0b 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -22,7 +22,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -35,6 +34,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@@ -90,7 +90,6 @@ public class LocalStorageManager extends SingleStorageManager
{
throw new IllegalArgumentException("Base path dirs must not be empty");
}
this.partitionsOfStorage = Maps.newConcurrentMap();
- long shuffleExpiredTimeoutMs =
conf.get(ShuffleServerConf.SHUFFLE_EXPIRED_TIMEOUT_MS);
long capacity = conf.getSizeAsBytes(ShuffleServerConf.DISK_CAPACITY);
double ratio = conf.getDouble(ShuffleServerConf.DISK_CAPACITY_RATIO);
double highWaterMarkOfWrite =
conf.get(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE);
@@ -121,7 +120,6 @@ public class LocalStorageManager extends
SingleStorageManager {
.ratio(ratio)
.lowWaterMarkOfWrite(lowWaterMarkOfWrite)
.highWaterMarkOfWrite(highWaterMarkOfWrite)
- .shuffleExpiredTimeoutMs(shuffleExpiredTimeoutMs)
.localStorageMedia(storageType)
.build();
successCount.incrementAndGet();
@@ -216,8 +214,7 @@ public class LocalStorageManager extends
SingleStorageManager {
int shuffleId = event.getShuffleId();
int partitionId = event.getStartPartition();
- LocalStorage storage = partitionsOfStorage.get(UnionKey.buildKey(appId,
shuffleId, partitionId));
- return storage;
+ return partitionsOfStorage.get(UnionKey.buildKey(appId, shuffleId,
partitionId));
}
@Override
@@ -264,7 +261,7 @@ public class LocalStorageManager extends
SingleStorageManager {
}
return paths.stream();
} else {
- return Arrays.asList(basicPath).stream();
+ return Stream.of(basicPath);
}
}).collect(Collectors.toList());
@@ -293,13 +290,7 @@ public class LocalStorageManager extends
SingleStorageManager {
}
private <K, V> void deleteElement(Map<K, V> map, Function<K, Boolean>
deleteConditionFunc) {
- Iterator<Map.Entry<K, V>> iterator = map.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<K, V> entry = iterator.next();
- if (deleteConditionFunc.apply(entry.getKey())) {
- iterator.remove();
- }
- }
+ map.entrySet().removeIf(entry ->
deleteConditionFunc.apply(entry.getKey()));
}
@Override
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index 44c08082..90a147f0 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -23,12 +23,10 @@ import java.nio.file.FileStore;
import java.nio.file.Files;
import java.util.HashSet;
import java.util.List;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Queues;
import org.apache.commons.io.FileUtils;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
@@ -51,14 +49,9 @@ public class LocalStorage extends AbstractStorage {
private long capacity;
private final String basePath;
private final String mountPoint;
- private final double cleanupThreshold;
- private final long cleanIntervalMs;
private final double highWaterMarkOfWrite;
private final double lowWaterMarkOfWrite;
- private final long shuffleExpiredTimeoutMs;
- private final Queue<String> expiredShuffleKeys =
Queues.newLinkedBlockingQueue();
-
- private LocalStorageMeta metaData = new LocalStorageMeta();
+ private final LocalStorageMeta metaData = new LocalStorageMeta();
private final StorageMedia media;
private boolean isSpaceEnough = true;
private volatile boolean isCorrupted = false;
@@ -66,12 +59,9 @@ public class LocalStorage extends AbstractStorage {
private LocalStorage(Builder builder) {
this.basePath = builder.basePath;
- this.cleanupThreshold = builder.cleanupThreshold;
- this.cleanIntervalMs = builder.cleanIntervalMs;
this.highWaterMarkOfWrite = builder.highWaterMarkOfWrite;
this.lowWaterMarkOfWrite = builder.lowWaterMarkOfWrite;
this.capacity = builder.capacity;
- this.shuffleExpiredTimeoutMs = builder.shuffleExpiredTimeoutMs;
this.media = builder.media;
File baseFolder = new File(basePath);
@@ -194,16 +184,6 @@ public class LocalStorage extends AbstractStorage {
metaData.prepareStartRead(key);
}
- public boolean isShuffleLongTimeNotRead(String shuffleKey) {
- if (metaData.getShuffleLastReadTs(shuffleKey) == -1) {
- return false;
- }
- if (System.currentTimeMillis() - metaData.getShuffleLastReadTs(shuffleKey)
> shuffleExpiredTimeoutMs) {
- return true;
- }
- return false;
- }
-
public void updateShuffleLastReadTs(String shuffleKey) {
metaData.updateShuffleLastReadTs(shuffleKey);
}
@@ -238,18 +218,6 @@ public class LocalStorage extends AbstractStorage {
return media;
}
- public double getHighWaterMarkOfWrite() {
- return highWaterMarkOfWrite;
- }
-
- public double getLowWaterMarkOfWrite() {
- return lowWaterMarkOfWrite;
- }
-
- public void addExpiredShuffleKey(String shuffleKey) {
- expiredShuffleKeys.offer(shuffleKey);
- }
-
// This is the only place to remove shuffle metadata, clean and gc thread
may remove
// the shuffle metadata concurrently or serially. Force uploader thread may
update the
// shuffle size so gc thread must acquire write lock before updating disk
size, and force
@@ -271,13 +239,11 @@ public class LocalStorage extends AbstractStorage {
shuffleKey, metaData.getDiskSize(),
metaData.getShuffleMetaSet().size());
} catch (Exception e) {
LOG.error("Fail to update disk size", e);
- expiredShuffleKeys.offer(shuffleKey);
} finally {
lock.writeLock().unlock();
}
} else {
LOG.info("Fail to get write lock of {}, add it back to expired shuffle
queue", shuffleKey);
- expiredShuffleKeys.offer(shuffleKey);
}
}
@@ -293,20 +259,6 @@ public class LocalStorage extends AbstractStorage {
return metaData.getSortedShuffleKeys(checkRead, num);
}
- public Set<String> getShuffleMetaSet() {
- return metaData.getShuffleMetaSet();
- }
-
- public void removeShuffle(String shuffleKey, long size, List<Integer>
partitions) {
- metaData.removeShufflePartitionList(shuffleKey, partitions);
- metaData.updateDiskSize(-size);
- metaData.updateShuffleSize(shuffleKey, -size);
- }
-
- public Queue<String> getExpiredShuffleKeys() {
- return expiredShuffleKeys;
- }
-
public boolean isCorrupted() {
return isCorrupted;
}
@@ -340,10 +292,7 @@ public class LocalStorage extends AbstractStorage {
private double ratio;
private double lowWaterMarkOfWrite;
private double highWaterMarkOfWrite;
- private double cleanupThreshold;
private String basePath;
- private long cleanIntervalMs;
- private long shuffleExpiredTimeoutMs;
private StorageMedia media;
private Builder() {
@@ -369,26 +318,11 @@ public class LocalStorage extends AbstractStorage {
return this;
}
- public Builder cleanupThreshold(double cleanupThreshold) {
- this.cleanupThreshold = cleanupThreshold;
- return this;
- }
-
public Builder highWaterMarkOfWrite(double highWaterMarkOfWrite) {
this.highWaterMarkOfWrite = highWaterMarkOfWrite;
return this;
}
- public Builder cleanIntervalMs(long cleanIntervalMs) {
- this.cleanIntervalMs = cleanIntervalMs;
- return this;
- }
-
- public Builder shuffleExpiredTimeoutMs(long shuffleExpiredTimeoutMs) {
- this.shuffleExpiredTimeoutMs = shuffleExpiredTimeoutMs;
- return this;
- }
-
public Builder localStorageMedia(StorageMedia media) {
this.media = media;
return this;
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
index dc57a0b6..71b4185d 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
@@ -93,18 +93,22 @@ public class LocalStorageMeta {
return shuffleMeta == null ? 0 : shuffleMeta.getNotUploadedSize();
}
- public long updateDiskSize(long delta) {
- return size.addAndGet(delta);
+ public void updateDiskSize(long delta) {
+ size.addAndGet(delta);
}
- public long updateShuffleSize(String shuffleId, long delta) {
+ public void updateShuffleSize(String shuffleId, long delta) {
ShuffleMeta shuffleMeta = getShuffleMeta(shuffleId);
- return shuffleMeta == null ? 0 : shuffleMeta.getSize().addAndGet(delta);
+ if (shuffleMeta != null) {
+ shuffleMeta.getSize().addAndGet(delta);
+ }
}
- public long updateUploadedShuffleSize(String shuffleKey, long delta) {
+ public void updateUploadedShuffleSize(String shuffleKey, long delta) {
ShuffleMeta shuffleMeta = getShuffleMeta(shuffleKey);
- return shuffleMeta == null ? 0 : shuffleMeta.uploadedSize.addAndGet(delta);
+ if (shuffleMeta != null) {
+ shuffleMeta.uploadedSize.addAndGet(delta);
+ }
}
public void addShufflePartitionList(String shuffleKey, List<Integer>
partitions) {
@@ -134,16 +138,6 @@ public class LocalStorageMeta {
}
}
- public void removeShufflePartitionList(String shuffleKey, List<Integer>
partitions) {
- ShuffleMeta shuffleMeta = getShuffleMeta(shuffleKey);
- if (shuffleMeta != null) {
- RoaringBitmap bitmap = shuffleMeta.partitionBitmap;
- synchronized (bitmap) {
- partitions.forEach(bitmap::remove);
- }
- }
- }
-
public void remoteShuffle(String shuffleKey) {
shuffleMetaMap.remove(shuffleKey);
}
@@ -197,11 +191,6 @@ public class LocalStorageMeta {
return shuffleMeta;
}
- public long getShuffleLastReadTs(String shuffleKey) {
- ShuffleMeta shuffleMeta = getShuffleMeta(shuffleKey);
- return shuffleMeta == null ? -1 : shuffleMeta.getShuffleLastReadTs();
- }
-
public void updateShuffleLastReadTs(String shuffleKey) {
ShuffleMeta shuffleMeta = getShuffleMeta(shuffleKey);
if (shuffleMeta != null) {
@@ -222,24 +211,16 @@ public class LocalStorageMeta {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final AtomicBoolean isStartRead = new AtomicBoolean(false);
private final RoaringBitmap uploadedPartitionBitmap =
RoaringBitmap.bitmapOf();
- private AtomicLong lastReadTs = new AtomicLong(-1L);
+ private final AtomicLong lastReadTs = new AtomicLong(-1L);
public AtomicLong getSize() {
return size;
}
- public AtomicLong getUploadedSize() {
- return uploadedSize;
- }
-
public long getNotUploadedSize() {
return size.longValue() - uploadedSize.longValue();
}
- public boolean isStartRead() {
- return isStartRead.get();
- }
-
public void markStartRead() {
isStartRead.set(true);
}
@@ -248,11 +229,6 @@ public class LocalStorageMeta {
lastReadTs.set(System.currentTimeMillis());
}
-
- public long getShuffleLastReadTs() {
- return lastReadTs.get();
- }
-
public ReadWriteLock getLock() {
return lock;
}
diff --git
a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
index 4aa7e2e3..3f1a17cd 100644
---
a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
+++
b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
@@ -70,11 +70,9 @@ public class LocalStorageTest {
private LocalStorage createTestStorage(File baseDir) {
return LocalStorage.newBuilder().basePath(baseDir.getAbsolutePath())
- .cleanupThreshold(50)
.highWaterMarkOfWrite(95)
.lowWaterMarkOfWrite(80)
.capacity(100)
- .cleanIntervalMs(5000)
.build();
}
@@ -97,12 +95,10 @@ public class LocalStorageTest {
@Test
public void getCapacityInitTest() {
LocalStorage item =
LocalStorage.newBuilder().basePath(testBaseDir.getAbsolutePath())
- .cleanupThreshold(50)
.highWaterMarkOfWrite(95)
.lowWaterMarkOfWrite(80)
.capacity(-1)
.ratio(0.1)
- .cleanIntervalMs(5000)
.build();
assertEquals((long) (testBaseDir.getTotalSpace() * 0.1),
item.getCapacity());
}
@@ -216,22 +212,18 @@ public class LocalStorageTest {
public void diskStorageInfoTest() {
LocalStorage item = LocalStorage.newBuilder()
.basePath(testBaseDir.getAbsolutePath())
- .cleanupThreshold(50)
.highWaterMarkOfWrite(95)
.lowWaterMarkOfWrite(80)
.capacity(100)
- .cleanIntervalMs(5000)
.build();
assertEquals(mountPoint, item.getMountPoint());
assertNull(item.getStorageMedia());
LocalStorage itemWithStorageType = LocalStorage.newBuilder()
.basePath(testBaseDir.getAbsolutePath())
- .cleanupThreshold(50)
.highWaterMarkOfWrite(95)
.lowWaterMarkOfWrite(80)
.capacity(100)
- .cleanIntervalMs(5000)
.localStorageMedia(StorageMedia.SSD)
.build();
assertEquals(StorageMedia.SSD, itemWithStorageType.getStorageMedia());