This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ea1c6301 [CELEBORN-80] FileWriter supports MapPartition (#1025)
ea1c6301 is described below
commit ea1c630173da41a6519e45d8e783bc6f1f68493d
Author: zhongqiangczq <[email protected]>
AuthorDate: Thu Dec 8 10:46:26 2022 +0800
[CELEBORN-80] FileWriter supports MapPartition (#1025)
---
.../service/deploy/worker/storage/FileWriter.java | 126 ++++++---------------
.../worker/storage/MapPartitionFileWriter.java | 97 ++++++++++++++++
.../worker/storage/ReducePartitionFileWriter.java | 113 ++++++++++++++++++
.../service/deploy/worker/PushDataHandler.scala | 9 +-
.../service/deploy/worker/WorkerSource.scala | 1 +
.../deploy/worker/storage/StorageManager.scala | 62 ++++++----
.../deploy/worker/storage/FileWriterSuiteJ.java | 9 +-
7 files changed, 296 insertions(+), 121 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index 175da298..5914765c 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -48,58 +48,38 @@ import
org.apache.celeborn.service.deploy.worker.WorkerSource;
* Note: Once FlushNotifier.exception is set, the whole file is not available.
* That's fine some of the internal state(e.g. bytesFlushed) may be
inaccurate.
*/
-public final class FileWriter implements DeviceObserver {
+public abstract class FileWriter implements DeviceObserver {
private static final Logger logger =
LoggerFactory.getLogger(FileWriter.class);
private static final long WAIT_INTERVAL_MS = 20;
- private final FileInfo fileInfo;
- private FileChannel channel;
- private FSDataOutputStream stream;
- private volatile boolean closed;
- private volatile boolean destroyed;
+ protected final FileInfo fileInfo;
+ protected FileChannel channel;
+ protected FSDataOutputStream stream;
+ protected volatile boolean closed;
+ protected volatile boolean destroyed;
- private final AtomicInteger numPendingWrites = new AtomicInteger();
- private long nextBoundary;
- private long bytesFlushed;
+ protected final AtomicInteger numPendingWrites = new AtomicInteger();
+ protected long bytesFlushed;
public final Flusher flusher;
private final int flushWorkerIndex;
- private CompositeByteBuf flushBuffer;
+ protected CompositeByteBuf flushBuffer;
- private final long shuffleChunkSize;
private final long writerCloseTimeoutMs;
- private final long flusherBufferSize;
+ protected final long flusherBufferSize;
- private final DeviceMonitor deviceMonitor;
- private final AbstractSource source; // metrics
+ protected final DeviceMonitor deviceMonitor;
+ protected final AbstractSource source; // metrics
private long splitThreshold = 0;
private final PartitionSplitMode splitMode;
private final PartitionType partitionType;
private final boolean rangeReadFilter;
-
private Runnable destroyHook;
- private boolean deleted = false;
+ protected boolean deleted = false;
private RoaringBitmap mapIdBitMap = null;
-
- private final FlushNotifier notifier = new FlushNotifier();
-
- // //////////////////////////////////////////////////////
- // map partition //
- // //////////////////////////////////////////////////////
-
- /** Number of reducepartitions */
- private int numReducePartitions;
-
- /** Index number of the current data region being written. */
- private int currentDataRegionIndex;
-
- /**
- * Whether current data region is a broadcast region or not. If true,
buffers added to this region
- * will be written to all reduce partitions.
- */
- private boolean isBroadcastRegion;
+ protected final FlushNotifier notifier = new FlushNotifier();
public FileWriter(
FileInfo fileInfo,
@@ -115,8 +95,6 @@ public final class FileWriter implements DeviceObserver {
this.fileInfo = fileInfo;
this.flusher = flusher;
this.flushWorkerIndex = flusher.getWorkerIndex();
- this.shuffleChunkSize = conf.shuffleChunkSize();
- this.nextBoundary = this.shuffleChunkSize;
this.writerCloseTimeoutMs = conf.writerCloseTimeoutMs();
this.splitThreshold = splitThreshold;
this.flusherBufferSize = conf.workerFlusherBufferSize();
@@ -153,7 +131,7 @@ public final class FileWriter implements DeviceObserver {
numPendingWrites.decrementAndGet();
}
- private void flush(boolean finalFlush) throws IOException {
+ protected void flush(boolean finalFlush) throws IOException {
int numBytes = flushBuffer.readableBytes();
notifier.checkException();
notifier.numPendingFlushes.incrementAndGet();
@@ -166,26 +144,6 @@ public final class FileWriter implements DeviceObserver {
addTask(task);
flushBuffer = null;
bytesFlushed += numBytes;
- maybeSetChunkOffsets(finalFlush);
- }
-
- private void maybeSetChunkOffsets(boolean forceSet) {
- if (bytesFlushed >= nextBoundary || forceSet) {
- fileInfo.addChunkOffset(bytesFlushed);
- nextBoundary = bytesFlushed + shuffleChunkSize;
- }
- }
-
- private boolean isChunkOffsetValid() {
- // Consider a scenario where some bytes have been flushed
- // but the chunk offset boundary has not yet been updated.
- // we should check if the chunk offset boundary equals
- // bytesFlush or not. For example:
- // The last record is a giant record and it has been flushed
- // but its size is smaller than the nextBoundary, then the
- // chunk offset will not be set after flushing. we should
- // set it during FileWriter close.
- return fileInfo.getLastChunkOffset() == bytesFlushed;
}
/**
@@ -254,7 +212,18 @@ public final class FileWriter implements DeviceObserver {
}
}
- public synchronized long close() throws IOException {
+ public abstract long close() throws IOException;
+
+ @FunctionalInterface
+ public interface RunnableWithException<R extends IOException> {
+ void run() throws R;
+ }
+
+ protected synchronized long close(
+ RunnableWithException tryClose,
+ RunnableWithException streamClose,
+ RunnableWithException finalClose)
+ throws IOException {
if (closed) {
String msg = "FileWriter has already closed! fileName " +
fileInfo.getFilePath();
logger.error(msg);
@@ -269,9 +238,7 @@ public final class FileWriter implements DeviceObserver {
if (flushBuffer.readableBytes() > 0) {
flush(true);
}
- if (!isChunkOffsetValid()) {
- maybeSetChunkOffsets(true);
- }
+ tryClose.run();
}
waitOnNoPending(notifier.numPendingFlushes);
@@ -283,24 +250,14 @@ public final class FileWriter implements DeviceObserver {
}
if (stream != null) {
stream.close();
- if
(StorageManager.hdfsFs().exists(fileInfo.getHdfsPeerWriterSuccessPath())) {
- StorageManager.hdfsFs().delete(fileInfo.getHdfsPath(), false);
- deleted = true;
- } else {
-
StorageManager.hdfsFs().create(fileInfo.getHdfsWriterSuccessPath()).close();
- FSDataOutputStream indexOutputStream =
- StorageManager.hdfsFs().create(fileInfo.getHdfsIndexPath());
- indexOutputStream.writeInt(fileInfo.getChunkOffsets().size());
- for (Long offset : fileInfo.getChunkOffsets()) {
- indexOutputStream.writeLong(offset);
- }
- indexOutputStream.close();
- }
+ streamClose.run();
}
} catch (IOException e) {
logger.warn("close file writer" + this + "failed", e);
}
+ finalClose.run();
+
// unregister from DeviceMonitor
if (!fileInfo.isHdfs()) {
logger.debug("file info {} register from device monitor");
@@ -364,7 +321,7 @@ public final class FileWriter implements DeviceObserver {
}
}
- private void waitOnNoPending(AtomicInteger counter) throws IOException {
+ protected void waitOnNoPending(AtomicInteger counter) throws IOException {
long waitTime = writerCloseTimeoutMs;
while (counter.get() > 0 && waitTime > 0) {
try {
@@ -385,7 +342,7 @@ public final class FileWriter implements DeviceObserver {
notifier.checkException();
}
- private void takeBuffer() {
+ protected void takeBuffer() {
// metrics start
String metricsName = null;
String fileAbsPath = null;
@@ -410,7 +367,7 @@ public final class FileWriter implements DeviceObserver {
}
}
- private void addTask(FlushTask task) throws IOException {
+ protected void addTask(FlushTask task) throws IOException {
if (!flusher.addTask(task, writerCloseTimeoutMs, flushWorkerIndex)) {
IOException e = new IOException("Add flush task timeout.");
notifier.setException(e);
@@ -418,7 +375,7 @@ public final class FileWriter implements DeviceObserver {
}
}
- private synchronized void returnBuffer() {
+ protected synchronized void returnBuffer() {
if (flushBuffer != null) {
flusher.returnBuffer(flushBuffer);
flushBuffer = null;
@@ -482,17 +439,4 @@ public final class FileWriter implements DeviceObserver {
public PartitionType getPartitionType() {
return partitionType;
}
-
- public void pushDataHandShake(int numReducePartitions) {
- this.numReducePartitions = numReducePartitions;
- }
-
- public void regionStart(int currentDataRegionIndex, boolean
isBroadcastRegion) {
- this.currentDataRegionIndex = currentDataRegionIndex;
- this.isBroadcastRegion = isBroadcastRegion;
- }
-
- public void regionFinish() {
- // flush index
- }
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
new file mode 100644
index 00000000..22a2410e
--- /dev/null
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import io.netty.buffer.CompositeByteBuf;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.celeborn.common.protocol.PartitionSplitMode;
+import org.apache.celeborn.common.protocol.PartitionType;
+
+/*
+ * map partition file writer, it will create index for each partition
+ */
+public final class MapPartitionFileWriter extends FileWriter {
+ private static final Logger logger =
LoggerFactory.getLogger(MapPartitionFileWriter.class);
+
+ private int numReducePartitions;
+ private int currentDataRegionIndex;
+ private boolean isBroadcastRegion;
+ private long[] numReducePartitionBytes;
+ private ByteBuffer indexBuffer;
+ private int currentReducePartition;
+ private long totalBytes;
+ private long regionStartingOffset;
+ private long numDataRegions;
+ private FileChannel channelIndex;
+ private FSDataOutputStream streamIndex;
+ private CompositeByteBuf flushBufferIndex;
+
+ public MapPartitionFileWriter(
+ FileInfo fileInfo,
+ Flusher flusher,
+ AbstractSource workerSource,
+ CelebornConf conf,
+ DeviceMonitor deviceMonitor,
+ long splitThreshold,
+ PartitionSplitMode splitMode,
+ boolean rangeReadFilter)
+ throws IOException {
+ super(
+ fileInfo,
+ flusher,
+ workerSource,
+ conf,
+ deviceMonitor,
+ splitThreshold,
+ splitMode,
+ PartitionType.MAP,
+ rangeReadFilter);
+ if (!fileInfo.isHdfs()) {
+ channelIndex = new
FileOutputStream(fileInfo.getIndexPath()).getChannel();
+ } else {
+ streamIndex =
StorageManager.hdfsFs().create(fileInfo.getHdfsIndexPath(), true);
+ }
+ }
+
+ @Override
+ public long close() throws IOException {
+ return 0;
+ }
+
+ public void pushDataHandShake(int numReducePartitions) {
+ this.numReducePartitions = numReducePartitions;
+ }
+
+ public void regionStart(int currentDataRegionIndex, boolean
isBroadcastRegion) {
+ this.currentDataRegionIndex = currentDataRegionIndex;
+ this.isBroadcastRegion = isBroadcastRegion;
+ }
+
+ public void regionFinish() throws IOException {}
+}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
new file mode 100644
index 00000000..843841e5
--- /dev/null
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.celeborn.common.protocol.PartitionSplitMode;
+import org.apache.celeborn.common.protocol.PartitionType;
+
+/*
+ * reduce partition file writer, it will create chunkindex
+ */
+public final class ReducePartitionFileWriter extends FileWriter {
+ private static final Logger logger =
LoggerFactory.getLogger(ReducePartitionFileWriter.class);
+
+ private long nextBoundary;
+ private final long shuffleChunkSize;
+
+ public ReducePartitionFileWriter(
+ FileInfo fileInfo,
+ Flusher flusher,
+ AbstractSource workerSource,
+ CelebornConf conf,
+ DeviceMonitor deviceMonitor,
+ long splitThreshold,
+ PartitionSplitMode splitMode,
+ boolean rangeReadFilter)
+ throws IOException {
+ super(
+ fileInfo,
+ flusher,
+ workerSource,
+ conf,
+ deviceMonitor,
+ splitThreshold,
+ splitMode,
+ PartitionType.REDUCE,
+ rangeReadFilter);
+ this.shuffleChunkSize = conf.shuffleChunkSize();
+ this.nextBoundary = this.shuffleChunkSize;
+ }
+
+ protected void flush(boolean finalFlush) throws IOException {
+ super.flush(finalFlush);
+ maybeSetChunkOffsets(finalFlush);
+ }
+
+ private void maybeSetChunkOffsets(boolean forceSet) {
+ if (bytesFlushed >= nextBoundary || forceSet) {
+ fileInfo.addChunkOffset(bytesFlushed);
+ nextBoundary = bytesFlushed + shuffleChunkSize;
+ }
+ }
+
+ private boolean isChunkOffsetValid() {
+ // Consider a scenario where some bytes have been flushed
+ // but the chunk offset boundary has not yet been updated.
+ // we should check if the chunk offset boundary equals
+ // bytesFlush or not. For example:
+ // The last record is a giant record and it has been flushed
+ // but its size is smaller than the nextBoundary, then the
+ // chunk offset will not be set after flushing. we should
+ // set it during FileWriter close.
+ return fileInfo.getLastChunkOffset() == bytesFlushed;
+ }
+
+ public synchronized long close() throws IOException {
+ return super.close(
+ () -> {
+ if (!isChunkOffsetValid()) {
+ maybeSetChunkOffsets(true);
+ }
+ },
+ () -> {
+ if
(StorageManager.hdfsFs().exists(fileInfo.getHdfsPeerWriterSuccessPath())) {
+ StorageManager.hdfsFs().delete(fileInfo.getHdfsPath(), false);
+ deleted = true;
+ } else {
+
StorageManager.hdfsFs().create(fileInfo.getHdfsWriterSuccessPath()).close();
+ FSDataOutputStream indexOutputStream =
+ StorageManager.hdfsFs().create(fileInfo.getHdfsIndexPath());
+ indexOutputStream.writeInt(fileInfo.getChunkOffsets().size());
+ for (Long offset : fileInfo.getChunkOffsets()) {
+ indexOutputStream.writeLong(offset);
+ }
+ indexOutputStream.close();
+ }
+ },
+ () -> {});
+ }
+}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index aba8809b..40cfcb13 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -37,7 +37,7 @@ import
org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod
import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.unsafe.Platform
import org.apache.celeborn.common.util.PackedPartitionId
-import org.apache.celeborn.service.deploy.worker.storage.{FileWriter,
LocalFlusher, StorageManager}
+import org.apache.celeborn.service.deploy.worker.storage.{FileWriter,
LocalFlusher, MapPartitionFileWriter, StorageManager}
class PushDataHandler extends BaseMessageHandler with Logging {
@@ -617,15 +617,16 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
try {
messageType match {
case Type.PUSH_DATA_HAND_SHAKE => {
-
fileWriter.pushDataHandShake(message.asInstanceOf[PushDataHandShake].numPartitions)
+ fileWriter.asInstanceOf[MapPartitionFileWriter].pushDataHandShake(
+ message.asInstanceOf[PushDataHandShake].numPartitions)
}
case Type.REGION_START => {
- fileWriter.regionStart(
+ fileWriter.asInstanceOf[MapPartitionFileWriter].regionStart(
message.asInstanceOf[RegionStart].currentRegionIndex,
message.asInstanceOf[RegionStart].isBroadcast)
}
case Type.REGION_FINISH => {
- fileWriter.regionFinish()
+ fileWriter.asInstanceOf[MapPartitionFileWriter].regionFinish()
}
}
// for master, send data to slave
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index 980bb781..745c486f 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -74,6 +74,7 @@ object WorkerSource {
// flush
val TakeBufferTime = "TakeBufferTime"
+ val TakeBufferTimeIndex = "TakeBufferTimeIndex"
val RegisteredShuffleCount = "RegisteredShuffleCount"
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index e6adfc3f..205d4c3c 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -284,16 +284,27 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
FileSystem.mkdirs(StorageManager.hdfsFs, shuffleDir, hdfsPermission)
val fileInfo =
new FileInfo(new Path(shuffleDir, fileName).toString,
userIdentifier, partitionType)
- val hdfsWriter = new FileWriter(
- fileInfo,
- hdfsFlusher.get,
- workerSource,
- conf,
- deviceMonitor,
- splitThreshold,
- splitMode,
- partitionType,
- rangeReadFilter)
+ val hdfsWriter = partitionType match {
+ case PartitionType.MAP => new MapPartitionFileWriter(
+ fileInfo,
+ hdfsFlusher.get,
+ workerSource,
+ conf,
+ deviceMonitor,
+ splitThreshold,
+ splitMode,
+ rangeReadFilter)
+ case PartitionType.REDUCE => new ReducePartitionFileWriter(
+ fileInfo,
+ hdfsFlusher.get,
+ workerSource,
+ conf,
+ deviceMonitor,
+ splitThreshold,
+ splitMode,
+ rangeReadFilter)
+ }
+
fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName,
fileInfo)
hdfsWriters.synchronized {
hdfsWriters.add(hdfsWriter)
@@ -318,16 +329,27 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
}
val fileInfo = new FileInfo(file.getAbsolutePath, userIdentifier,
partitionType)
- val fileWriter = new FileWriter(
- fileInfo,
- localFlushers.get(mountPoint),
- workerSource,
- conf,
- deviceMonitor,
- splitThreshold,
- splitMode,
- partitionType,
- rangeReadFilter)
+
+ val fileWriter = partitionType match {
+ case PartitionType.MAP => new MapPartitionFileWriter(
+ fileInfo,
+ localFlushers.get(mountPoint),
+ workerSource,
+ conf,
+ deviceMonitor,
+ splitThreshold,
+ splitMode,
+ rangeReadFilter)
+ case PartitionType.REDUCE => new ReducePartitionFileWriter(
+ fileInfo,
+ localFlushers.get(mountPoint),
+ workerSource,
+ conf,
+ deviceMonitor,
+ splitThreshold,
+ splitMode,
+ rangeReadFilter)
+ }
deviceMonitor.registerFileWriter(fileWriter)
val list = workingDirWriters.computeIfAbsent(dir,
workingDirWriterListFunc)
list.synchronized {
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
index f20ea47f..6ef563c2 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
@@ -238,7 +238,7 @@ public class FileWriterSuiteJ {
final int threadsNum = 8;
File file = getTemporaryFile();
FileWriter fileWriter =
- new FileWriter(
+ new ReducePartitionFileWriter(
new FileInfo(file, userIdentifier),
localFlusher,
source,
@@ -246,7 +246,6 @@ public class FileWriterSuiteJ {
DeviceMonitor$.MODULE$.EmptyMonitor(),
SPLIT_THRESHOLD,
splitMode,
- partitionType,
false);
List<Future<?>> futures = new ArrayList<>();
@@ -283,7 +282,7 @@ public class FileWriterSuiteJ {
final int threadsNum = Runtime.getRuntime().availableProcessors();
File file = getTemporaryFile();
FileWriter fileWriter =
- new FileWriter(
+ new ReducePartitionFileWriter(
new FileInfo(file, userIdentifier),
localFlusher,
source,
@@ -291,7 +290,6 @@ public class FileWriterSuiteJ {
DeviceMonitor$.MODULE$.EmptyMonitor(),
SPLIT_THRESHOLD,
splitMode,
- partitionType,
false);
List<Future<?>> futures = new ArrayList<>();
@@ -337,7 +335,7 @@ public class FileWriterSuiteJ {
File file = getTemporaryFile();
FileInfo fileInfo = new FileInfo(file, userIdentifier);
FileWriter fileWriter =
- new FileWriter(
+ new ReducePartitionFileWriter(
fileInfo,
localFlusher,
source,
@@ -345,7 +343,6 @@ public class FileWriterSuiteJ {
DeviceMonitor$.MODULE$.EmptyMonitor(),
SPLIT_THRESHOLD,
splitMode,
- partitionType,
false);
List<Future<?>> futures = new ArrayList<>();