divijvaidya commented on code in PR #12331:
URL: https://github.com/apache/kafka/pull/12331#discussion_r908259484
##########
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##########
@@ -461,14 +463,20 @@ private static FileChannel openChannel(File file,
int initFileSize,
boolean preallocate) throws
IOException {
if (mutable) {
- if (fileAlreadyExists || !preallocate) {
- return FileChannel.open(file.toPath(),
StandardOpenOption.CREATE, StandardOpenOption.READ,
- StandardOpenOption.WRITE);
- } else {
- RandomAccessFile randomAccessFile = new RandomAccessFile(file,
"rw");
- randomAccessFile.setLength(initFileSize);
- return randomAccessFile.getChannel();
+ if (preallocate && !fileAlreadyExists) {
+ final OpenOption[] options = {
StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW,
StandardOpenOption.SPARSE };
+ try (final SeekableByteChannel channel =
Files.newByteChannel(file.toPath(), options)) {
+
channel.position(initFileSize-Integer.BYTES);
+
+ final ByteBuffer buffer =
ByteBuffer.allocate(Integer.BYTES).putInt(0);
+ buffer.rewind();
+ channel.write(buffer);
+ }
}
+ /* A separate open call is needed even when having a
RandomAccessFile
Review Comment:
this comment might need modification now since we are not using
RandomAccessFile anymore.
##########
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##########
@@ -461,14 +463,20 @@ private static FileChannel openChannel(File file,
int initFileSize,
boolean preallocate) throws
IOException {
if (mutable) {
- if (fileAlreadyExists || !preallocate) {
- return FileChannel.open(file.toPath(),
StandardOpenOption.CREATE, StandardOpenOption.READ,
- StandardOpenOption.WRITE);
- } else {
- RandomAccessFile randomAccessFile = new RandomAccessFile(file,
"rw");
- randomAccessFile.setLength(initFileSize);
- return randomAccessFile.getChannel();
+ if (preallocate && !fileAlreadyExists) {
+ final OpenOption[] options = {
StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW,
StandardOpenOption.SPARSE };
+ try (final SeekableByteChannel channel =
Files.newByteChannel(file.toPath(), options)) {
+
channel.position(initFileSize-Integer.BYTES);
+
+ final ByteBuffer buffer =
ByteBuffer.allocate(Integer.BYTES).putInt(0);
+ buffer.rewind();
+ channel.write(buffer);
+ }
Review Comment:
This block of code to create an empty file of a particular size could be
moved into utility classes which contain other file handling helper methods
such as `org.apache.kafka.common.utils.Utils.java`
You could name it: `FileChannel Utils.createPreallocatedFile(Path, Size)`
##########
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##########
@@ -461,14 +463,20 @@ private static FileChannel openChannel(File file,
int initFileSize,
boolean preallocate) throws
IOException {
if (mutable) {
- if (fileAlreadyExists || !preallocate) {
- return FileChannel.open(file.toPath(),
StandardOpenOption.CREATE, StandardOpenOption.READ,
- StandardOpenOption.WRITE);
- } else {
- RandomAccessFile randomAccessFile = new RandomAccessFile(file,
"rw");
- randomAccessFile.setLength(initFileSize);
- return randomAccessFile.getChannel();
+ if (preallocate && !fileAlreadyExists) {
+ final OpenOption[] options = {
StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW,
StandardOpenOption.SPARSE };
Review Comment:
nit
indentation
##########
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##########
@@ -461,14 +461,15 @@ private static FileChannel openChannel(File file,
int initFileSize,
boolean preallocate) throws
IOException {
if (mutable) {
- if (fileAlreadyExists || !preallocate) {
- return FileChannel.open(file.toPath(),
StandardOpenOption.CREATE, StandardOpenOption.READ,
- StandardOpenOption.WRITE);
- } else {
- RandomAccessFile randomAccessFile = new RandomAccessFile(file,
"rw");
- randomAccessFile.setLength(initFileSize);
- return randomAccessFile.getChannel();
+ if (preallocate && !fileAlreadyExists) {
+ try (RandomAccessFile randomAccessFile = new
RandomAccessFile(file, "rw")) {
Review Comment:
I might not have made myself clear with the previous comment. Let me try to
clarify. `FileChannel` is an implementation of `SeekableByteChannel`. In the
current code you are opening the file twice when preallocate flag is on. Once
to fill it up with dummy data and second to return a channel. We need to do it
only once.
You can use `FileChannel` once to replace the data as well as return
outside. You won't have to open twice.
The code change will look like:
```
if (preallocate && !fileAlreadyExists) {
final OpenOption[] options = { StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW, StandardOpenOption.SPARSE };
final FileChannel channel = FileChannel.open(file.toPath(), options))
channel.position(initFileSize-Integer.BYTES);
final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES).putInt(0);
buffer.rewind();
channel.write(buffer);
//TODO: reset the position()
return channel;
} else {
return FileChannel.open(file.toPath(), StandardOpenOption.CREATE,
StandardOpenOption.READ,
StandardOpenOption.WRITE);
}
```
##########
core/src/main/scala/kafka/log/AbstractIndex.scala:
##########
@@ -108,22 +109,33 @@ abstract class AbstractIndex(@volatile private var _file:
File, val baseOffset:
@volatile
protected var mmap: MappedByteBuffer = {
val newlyCreated = file.createNewFile()
- val raf = if (writable) new RandomAccessFile(file, "rw") else new
RandomAccessFile(file, "r")
- try {
- /* pre-allocate the file if necessary */
- if(newlyCreated) {
- if(maxIndexSize < entrySize)
- throw new IllegalArgumentException("Invalid max index size: " +
maxIndexSize)
+ /* pre-allocate the file if necessary */
+ if(newlyCreated) {
+ if(maxIndexSize < entrySize)
+ throw new IllegalArgumentException("Invalid max index size: " +
maxIndexSize)
+ val raf = if (writable) new RandomAccessFile(file, "rw") else new
RandomAccessFile(file, "r")
Review Comment:
You can replace RandomAccessFile here with FileChannel just as we did at the
other place and hence, eliminating the need for opening the file twice.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]