This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch revert-1184-diskfull
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit da77d6974ad793288faaa852016fd5bb52a78b36
Author: Ethan Feng <[email protected]>
AuthorDate: Thu Feb 2 14:29:24 2023 +0800

    Revert "[CELEBORN-243] fix bug that os's disk usage is low but celeborn 
thinks that it's high_disk_usage (#1184)"
    
    This reverts commit ff17a61ec5ca85f6529328b4e53aabb40dd5f40c.
---
 .../celeborn/service/deploy/worker/Worker.scala    |  20 +---
 .../deploy/worker/storage/WorkerSuite.scala        | 132 ---------------------
 2 files changed, 4 insertions(+), 148 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 46c58d0d..3e61a9d9 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -24,6 +24,7 @@ import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 import com.google.common.annotations.VisibleForTesting
 import io.netty.util.HashedWheelTimer
@@ -46,7 +47,7 @@ import org.apache.celeborn.common.quota.ResourceConsumption
 import org.apache.celeborn.common.rpc._
 import org.apache.celeborn.common.util.{ShutdownHookManager, ThreadUtils, 
Utils}
 import org.apache.celeborn.server.common.{HttpService, Service}
-import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, 
PartitionFilesSorter, StorageManager}
+import 
org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, 
StorageManager}
 
 private[celeborn] class Worker(
     override val conf: CelebornConf,
@@ -408,22 +409,19 @@ private[celeborn] class Worker(
     // If worker register still failed after retry, throw exception to stop 
worker process
     throw new CelebornException("Register worker failed.", exception)
   }
-  @VisibleForTesting
-  def cleanup(expiredShuffleKeys: JHashSet[String]): Unit = synchronized {
+
+  private def cleanup(expiredShuffleKeys: JHashSet[String]): Unit = 
synchronized {
     expiredShuffleKeys.asScala.foreach { shuffleKey =>
       partitionLocationInfo.getAllMasterLocations(shuffleKey).asScala.foreach 
{ partition =>
         val fileWriter = partition.asInstanceOf[WorkingPartition].getFileWriter
         fileWriter.destroy(new IOException(
           s"Destroy FileWriter ${fileWriter} caused by shuffle ${shuffleKey} 
expired."))
-        removeExpiredWorkingDirWriters(fileWriter)
       }
       partitionLocationInfo.getAllSlaveLocations(shuffleKey).asScala.foreach { 
partition =>
         val fileWriter = partition.asInstanceOf[WorkingPartition].getFileWriter
         fileWriter.destroy(new IOException(
           s"Destroy FileWriter ${fileWriter} caused by shuffle ${shuffleKey} 
expired."))
-        removeExpiredWorkingDirWriters(fileWriter)
       }
-
       partitionLocationInfo.removeMasterPartitions(shuffleKey)
       partitionLocationInfo.removeSlavePartitions(shuffleKey)
       shufflePartitionType.remove(shuffleKey)
@@ -436,16 +434,6 @@ private[celeborn] class Worker(
     storageManager.cleanupExpiredShuffleKey(expiredShuffleKeys)
   }
 
-  @VisibleForTesting
-  def removeExpiredWorkingDirWriters(fileWriter: FileWriter): Unit = {
-    // filepath is dir/appId/shuffleId/filename
-    val dir = fileWriter.getFile.getParentFile.getParentFile.getParentFile
-    storageManager.workingDirWriters.asScala.get(dir).map(f =>
-      f.synchronized {
-        f.remove(fileWriter)
-      })
-  }
-
   override def getWorkerInfo: String = workerInfo.toString()
 
   override def getThreadDump: String = Utils.getThreadDump()
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
deleted file mode 100644
index 0939f168..00000000
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.service.deploy.worker.storage
-
-import java.io.File
-import java.util
-import java.util.{HashSet => JHashSet}
-
-import org.junit.Assert
-import org.mockito.MockitoSugar._
-import org.scalatest.funsuite.AnyFunSuite
-
-import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.meta.FileInfo
-import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionType}
-import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments, 
WorkerSource, WorkingPartition}
-
-class WorkerSuite extends AnyFunSuite {
-  val conf = new CelebornConf()
-  val workerArgs = new WorkerArguments(Array(), conf)
-  test("clean up") {
-
-    val worker = new Worker(conf, workerArgs)
-    val expiredShuffleKeys = new JHashSet[String]()
-    val shuffleKey1 = "s1"
-    val shuffleKey2 = "s2"
-    expiredShuffleKeys.add(shuffleKey1)
-    expiredShuffleKeys.add(shuffleKey2)
-    val pl1 = new PartitionLocation(0, 0, "12", 0, 0, 0, 0, 
PartitionLocation.Mode.MASTER)
-    val pl2 = new PartitionLocation(0, 0, "12", 0, 0, 0, 0, 
PartitionLocation.Mode.SLAVE)
-
-    val dir1 = new File("/tmp/work1")
-    val dir2 = new File("/tmp/work2")
-    val dir3 = new File("/tmp/work3")
-
-    val filePath1 = new File(dir1, "/1/1")
-    filePath1.mkdirs()
-    val file1 = new File(filePath1, "/1")
-    file1.createNewFile()
-    val filePath2 = new File(dir2, "/2/2")
-    filePath2.mkdirs()
-    val file2 = new File(filePath2, "/2")
-    file2.createNewFile()
-
-    val filePath3 = new File(dir3, "/3/3")
-    filePath3.mkdirs()
-    val file3 = new File(filePath3, "/3")
-    file3.createNewFile()
-
-    val fw1 = new ReducePartitionFileWriter(
-      new FileInfo(file1.getAbsolutePath, null, null, PartitionType.REDUCE),
-      mock[Flusher],
-      new WorkerSource(conf),
-      conf,
-      mock[DeviceMonitor],
-      0,
-      null,
-      false)
-    fw1.registerDestroyHook(new util.ArrayList(util.Arrays.asList(fw1)))
-    val fw2 = new ReducePartitionFileWriter(
-      new FileInfo(file2.getAbsolutePath, null, null, PartitionType.REDUCE),
-      mock[Flusher],
-      new WorkerSource(conf),
-      conf,
-      mock[DeviceMonitor],
-      0,
-      null,
-      false)
-    fw2.registerDestroyHook(new util.ArrayList(util.Arrays.asList(fw2)))
-    val fw3 = new ReducePartitionFileWriter(
-      new FileInfo(file3.getAbsolutePath, null, null, PartitionType.REDUCE),
-      mock[Flusher],
-      new WorkerSource(conf),
-      conf,
-      mock[DeviceMonitor],
-      0,
-      null,
-      false)
-    fw3.registerDestroyHook(new util.ArrayList(util.Arrays.asList(fw3)))
-
-    val wl1 = new WorkingPartition(pl1, fw1)
-    val wl2 = new WorkingPartition(pl2, fw2)
-    val wl3 = new WorkingPartition(pl2, fw3)
-    worker.partitionLocationInfo.addMasterPartitions(shuffleKey1, 
util.Arrays.asList(wl1))
-    worker.partitionLocationInfo.addMasterPartitions(shuffleKey1, 
util.Arrays.asList(wl3))
-    worker.partitionLocationInfo.addSlavePartitions(shuffleKey1, 
util.Arrays.asList(wl2))
-
-    val fws1 = new util.ArrayList[FileWriter]()
-    fws1.add(fw1)
-    val fws2 = new util.ArrayList[FileWriter]()
-    fws2.add(fw2)
-    worker.storageManager.workingDirWriters.put(dir1, fws1)
-    worker.storageManager.workingDirWriters.put(dir2, fws2)
-    Assert.assertEquals(1, 
worker.storageManager.workingDirWriters.get(dir1).size())
-    Assert.assertEquals(1, 
worker.storageManager.workingDirWriters.get(dir2).size())
-    worker.cleanup(expiredShuffleKeys)
-    Assert.assertEquals(0, 
worker.storageManager.workingDirWriters.get(dir1).size())
-    Assert.assertEquals(0, 
worker.storageManager.workingDirWriters.get(dir2).size())
-
-    deleteFile(dir1)
-    deleteFile(dir2)
-    deleteFile(dir3)
-  }
-  def deleteFile(dir: File): Unit = {
-    val files = dir.listFiles();
-    if (files != null) {
-      files.foreach(file => {
-        if (file.isFile()) {
-          file.delete();
-        } else {
-          deleteFile(file);
-        }
-      })
-      dir.delete();
-    }
-  }
-}

Reply via email to