This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a4fd04f Revert "[CELEBORN-120] MapPartitionFileWriter completes some 
functions to support mappartition (#1062)"
3a4fd04f is described below

commit 3a4fd04f3376384677c5a38b3d30e8c699d731af
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Sat Dec 17 11:10:54 2022 +0800

    Revert "[CELEBORN-120] MapPartitionFileWriter completes some functions to 
support mappartition (#1062)"
    
    This reverts commit aa5a4d8c41b009b7508fdf1c9ee16e78c3c1d28c.
---
 .../worker/storage/MapPartitionFileWriter.java     | 194 +--------------------
 .../storage/MapPartitionFileWriterSuiteJ.java      | 159 -----------------
 2 files changed, 3 insertions(+), 350 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
index 07a63ad8..84bf8503 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
@@ -20,15 +20,10 @@ package org.apache.celeborn.service.deploy.worker.storage;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
-import java.util.Arrays;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.Unpooled;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,9 +32,6 @@ import org.apache.celeborn.common.meta.FileInfo;
 import org.apache.celeborn.common.metrics.source.AbstractSource;
 import org.apache.celeborn.common.protocol.PartitionSplitMode;
 import org.apache.celeborn.common.protocol.PartitionType;
-import org.apache.celeborn.common.unsafe.Platform;
-import org.apache.celeborn.common.util.Utils;
-import org.apache.celeborn.service.deploy.worker.WorkerSource;
 
 /*
  * map partition file writer, it will create index for each partition
@@ -85,98 +77,11 @@ public final class MapPartitionFileWriter extends 
FileWriter {
     } else {
       streamIndex = 
StorageManager.hadoopFs().create(fileInfo.getHdfsIndexPath(), true);
     }
-    takeBufferIndex();
-  }
-
-  private void takeBufferIndex() {
-    // metrics start
-    String metricsName = null;
-    String fileAbsPath = null;
-    if (source.metricsCollectCriticalEnabled()) {
-      metricsName = WorkerSource.TakeBufferTimeIndex();
-      fileAbsPath = fileInfo.getIndexPath();
-      source.startTimer(metricsName, fileAbsPath);
-    }
-
-    // real action
-    flushBufferIndex = flusher.takeBuffer();
-
-    // metrics end
-    if (source.metricsCollectCriticalEnabled()) {
-      source.stopTimer(metricsName, fileAbsPath);
-    }
-
-    if (flushBufferIndex == null) {
-      IOException e =
-          new IOException(
-              "Take buffer index encounter error from Flusher: " + 
flusher.bufferQueueInfo());
-      notifier.setException(e);
-    }
-  }
-
-  public void write(ByteBuf data) throws IOException {
-    byte[] header = new byte[16];
-    data.markReaderIndex();
-    data.readBytes(header);
-    data.resetReaderIndex();
-    int partitionId = Platform.getInt(header, Platform.BYTE_ARRAY_OFFSET);
-    collectPartitionDataLength(partitionId, data);
-
-    super.write(data);
-  }
-
-  private void collectPartitionDataLength(int partitionId, ByteBuf data) {
-    if (numReducePartitionBytes == null) {
-      numReducePartitionBytes = new long[numReducePartitions];
-    }
-    currentReducePartition = partitionId;
-    long length = data.readableBytes();
-    totalBytes += length;
-    numReducePartitionBytes[partitionId] += length;
   }
 
   @Override
-  public synchronized long close() throws IOException {
-    return super.close(
-        () -> {
-          if (flushBufferIndex.readableBytes() > 0) {
-            flushIndex();
-          }
-        },
-        () -> {
-          if 
(StorageManager.hdfsFs().exists(fileInfo.getHdfsPeerWriterSuccessPath())) {
-            StorageManager.hdfsFs().delete(fileInfo.getHdfsPath(), false);
-            deleted = true;
-          } else {
-            
StorageManager.hdfsFs().create(fileInfo.getHdfsWriterSuccessPath()).close();
-          }
-        },
-        () -> {
-          returnBufferIndex();
-          if (channelIndex != null) {
-            channelIndex.close();
-          }
-          if (streamIndex != null) {
-            streamIndex.close();
-            if (StorageManager.hdfsFs()
-                .exists(
-                    new Path(
-                        Utils.getWriteSuccessFilePath(
-                            Utils.getPeerPath(fileInfo.getIndexPath()))))) {
-              StorageManager.hdfsFs().delete(fileInfo.getHdfsIndexPath(), 
false);
-              deleted = true;
-            } else {
-              StorageManager.hdfsFs()
-                  .create(new 
Path(Utils.getWriteSuccessFilePath((fileInfo.getIndexPath()))))
-                  .close();
-            }
-          }
-        });
-  }
-
-  public synchronized void destroy(IOException ioException) {
-    destroyIndex();
-    super.destroy(ioException);
+  public long close() throws IOException {
+    return 0;
   }
 
   public void pushDataHandShake(int numReducePartitions, int bufferSize) {
@@ -189,98 +94,5 @@ public final class MapPartitionFileWriter extends 
FileWriter {
     this.isBroadcastRegion = isBroadcastRegion;
   }
 
-  public void regionFinish() throws IOException {
-    if (regionStartingOffset == totalBytes) {
-      return;
-    }
-
-    long fileOffset = regionStartingOffset;
-    if (indexBuffer == null) {
-      indexBuffer = allocateIndexBuffer(numReducePartitions);
-    }
-
-    // write the index information of the current data region
-    for (int partitionIndex = 0; partitionIndex < numReducePartitions; 
++partitionIndex) {
-      indexBuffer.putLong(fileOffset);
-      if (!isBroadcastRegion) {
-        indexBuffer.putLong(numReducePartitionBytes[partitionIndex]);
-        fileOffset += numReducePartitionBytes[partitionIndex];
-      } else {
-        indexBuffer.putLong(numReducePartitionBytes[0]);
-      }
-    }
-
-    if (!indexBuffer.hasRemaining()) {
-      flushIndex();
-      takeBufferIndex();
-    }
-
-    ++numDataRegions;
-    regionStartingOffset = totalBytes;
-    Arrays.fill(numReducePartitionBytes, 0);
-  }
-
-  private synchronized void returnBufferIndex() {
-    if (flushBufferIndex != null) {
-      flusher.returnBuffer(flushBufferIndex);
-      flushBufferIndex = null;
-    }
-  }
-
-  private synchronized void destroyIndex() {
-    returnBufferIndex();
-    try {
-      if (channelIndex != null) {
-        channelIndex.close();
-      }
-      if (streamIndex != null) {
-        streamIndex.close();
-      }
-    } catch (IOException e) {
-      logger.warn(
-          "Close channel failed for file {} caused by {}.",
-          fileInfo.getIndexPath(),
-          e.getMessage());
-    }
-  }
-
-  private void flushIndex() throws IOException {
-    indexBuffer.flip();
-    notifier.checkException();
-    notifier.numPendingFlushes.incrementAndGet();
-    if (indexBuffer.hasRemaining()) {
-      FlushTask task = null;
-      if (channelIndex != null) {
-        Unpooled.wrappedBuffer(indexBuffer);
-        task = new LocalFlushTask(flushBufferIndex, channelIndex, notifier);
-      } else if (streamIndex != null) {
-        task = new HdfsFlushTask(flushBufferIndex, streamIndex, notifier);
-      }
-      addTask(task);
-      flushBufferIndex = null;
-    }
-    indexBuffer.clear();
-  }
-
-  private ByteBuffer allocateIndexBuffer(int numPartitions) {
-
-    // the returned buffer size is no smaller than 4096 bytes to improve disk 
IO performance
-    int minBufferSize = 4096;
-
-    int indexRegionSize = numPartitions * (8 + 8);
-    if (indexRegionSize >= minBufferSize) {
-      ByteBuffer buffer = ByteBuffer.allocateDirect(indexRegionSize);
-      buffer.order(ByteOrder.BIG_ENDIAN);
-      return buffer;
-    }
-
-    int numRegions = minBufferSize / indexRegionSize;
-    if (minBufferSize % indexRegionSize != 0) {
-      ++numRegions;
-    }
-    ByteBuffer buffer = ByteBuffer.allocateDirect(numRegions * 
indexRegionSize);
-    buffer.order(ByteOrder.BIG_ENDIAN);
-
-    return buffer;
-  }
+  public void regionFinish() throws IOException {}
 }
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriterSuiteJ.java
deleted file mode 100644
index c68c5f9c..00000000
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriterSuiteJ.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.service.deploy.worker.storage;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicLong;
-
-import scala.Function0;
-import scala.collection.mutable.ListBuffer;
-
-import io.netty.buffer.Unpooled;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.identity.UserIdentifier;
-import org.apache.celeborn.common.meta.FileInfo;
-import org.apache.celeborn.common.network.server.memory.MemoryManager;
-import org.apache.celeborn.common.network.util.JavaUtils;
-import org.apache.celeborn.common.protocol.PartitionSplitMode;
-import org.apache.celeborn.common.protocol.StorageInfo;
-import org.apache.celeborn.common.unsafe.Platform;
-import org.apache.celeborn.common.util.Utils;
-import org.apache.celeborn.service.deploy.worker.WorkerSource;
-
-public class MapPartitionFileWriterSuiteJ {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(MapPartitionFileWriterSuiteJ.class);
-
-  private static final CelebornConf CONF = new CelebornConf();
-  public static final Long SPLIT_THRESHOLD = 256 * 1024 * 1024L;
-  public static final PartitionSplitMode splitMode = PartitionSplitMode.HARD;
-
-  private static File tempDir = null;
-  private static LocalFlusher localFlusher = null;
-  private static WorkerSource source = null;
-
-  private final UserIdentifier userIdentifier = new 
UserIdentifier("mock-tenantId", "mock-name");
-
-  @BeforeClass
-  public static void beforeAll() {
-    tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"celeborn");
-
-    source = Mockito.mock(WorkerSource.class);
-    Mockito.doAnswer(
-            invocationOnMock -> {
-              Function0<?> function = (Function0<?>) 
invocationOnMock.getArguments()[2];
-              return function.apply();
-            })
-        .when(source)
-        .sample(Mockito.anyString(), Mockito.anyString(), 
Mockito.any(Function0.class));
-
-    ListBuffer<File> dirs = new ListBuffer<>();
-    dirs.$plus$eq(tempDir);
-    localFlusher =
-        new LocalFlusher(
-            source, DeviceMonitor$.MODULE$.EmptyMonitor(), 1, "disk1", 20, 1, 
StorageInfo.Type.HDD);
-    MemoryManager.initialize(0.8, 0.9, 0.5, 0.6, 0.1, 0.1, 10, 10);
-  }
-
-  @AfterClass
-  public static void afterAll() {
-    if (tempDir != null) {
-      try {
-        JavaUtils.deleteRecursively(tempDir);
-        tempDir = null;
-      } catch (IOException e) {
-        LOG.error("Failed to delete temp dir.", e);
-      }
-    }
-  }
-
-  @Test
-  public void testMultiThreadWrite() throws IOException, ExecutionException, 
InterruptedException {
-    File file = getTemporaryFile();
-    MapPartitionFileWriter fileWriter =
-        new MapPartitionFileWriter(
-            new FileInfo(file, userIdentifier),
-            localFlusher,
-            source,
-            CONF,
-            DeviceMonitor$.MODULE$.EmptyMonitor(),
-            SPLIT_THRESHOLD,
-            splitMode,
-            false);
-    fileWriter.pushDataHandShake(2);
-    fileWriter.regionStart(0, false);
-    byte[] partData0 = generateData(0);
-    byte[] partData1 = generateData(1);
-    AtomicLong length = new AtomicLong(0);
-    try {
-      fileWriter.write(Unpooled.wrappedBuffer(partData0));
-      length.addAndGet(partData0.length);
-      fileWriter.write(Unpooled.wrappedBuffer(partData1));
-      length.addAndGet(partData1.length);
-
-      fileWriter.regionFinish();
-    } catch (IOException e) {
-      LOG.error("Failed to write buffer.", e);
-    }
-
-    long bytesWritten = fileWriter.close();
-
-    assertEquals(length.get(), bytesWritten);
-    assertEquals(fileWriter.getFile().length(), bytesWritten);
-  }
-
-  private File getTemporaryFile() throws IOException {
-    String filename = UUID.randomUUID().toString();
-    File temporaryFile = new File(tempDir, filename);
-    temporaryFile.createNewFile();
-    return temporaryFile;
-  }
-
-  private byte[] generateData(int partitionId) {
-    ThreadLocalRandom rand = ThreadLocalRandom.current();
-    byte[] hello = "hello, world".getBytes(StandardCharsets.UTF_8);
-    int headerLength = 16;
-    int tempLen = rand.nextInt(256 * 1024) + 128 * 1024 - headerLength;
-    int len = (int) (Math.ceil(1.0 * tempLen / hello.length) * hello.length) + 
headerLength;
-
-    byte[] data = new byte[len];
-    Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET, partitionId);
-    Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET + 4, 0);
-    Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET + 8, rand.nextInt());
-    Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET + 12, len);
-
-    for (int i = headerLength; i < len; i += hello.length) {
-      System.arraycopy(hello, 0, data, i, hello.length);
-    }
-    return data;
-  }
-}

Reply via email to