This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9c8aaa2c35a MINOR: Fix lossy conversions flagged by Java 20 (#13582)
9c8aaa2c35a is described below
commit 9c8aaa2c35aabb09bd2d5c3d28d1b4587818b419
Author: Ismael Juma <[email protected]>
AuthorDate: Thu Jun 22 08:05:55 2023 -0700
MINOR: Fix lossy conversions flagged by Java 20 (#13582)
An example of the warning:
> warning: [lossy-conversions] implicit cast from long to int in compound
assignment is possibly lossy
There should be no change in behavior as part of these changes - runtime
logic ensured
we didn't run into issues due to the lossy conversions.
Reviewers: Divij Vaidya <[email protected]>
---
.../org/apache/kafka/common/record/CompressionType.java | 16 +++++++++-------
.../org/apache/kafka/common/record/DefaultRecord.java | 2 +-
.../apache/kafka/common/record/DefaultRecordBatch.java | 2 +-
.../apache/kafka/common/record/DefaultRecordsSend.java | 2 +-
.../java/org/apache/kafka/common/record/FileRecords.java | 7 ++++---
.../common/record/LazyDownConversionRecordsSend.java | 5 +++--
.../org/apache/kafka/common/record/LegacyRecord.java | 4 ++--
.../org/apache/kafka/common/record/MemoryRecords.java | 8 +++-----
.../org/apache/kafka/common/record/MultiRecordsSend.java | 2 +-
.../java/org/apache/kafka/common/record/RecordsSend.java | 8 ++++----
.../apache/kafka/common/record/TransferableRecords.java | 2 +-
.../apache/kafka/common/record/UnalignedFileRecords.java | 7 ++++---
.../kafka/common/record/UnalignedMemoryRecords.java | 8 +++-----
.../kafka/common/serialization/ShortDeserializer.java | 2 +-
.../main/java/org/apache/kafka/common/utils/Utils.java | 2 +-
.../org/apache/kafka/common/compress/KafkaLZ4Test.java | 2 +-
.../org/apache/kafka/common/metrics/stats/MeterTest.java | 2 +-
.../org/apache/kafka/common/record/FileRecordsTest.java | 10 +++++-----
.../kafka/streams/processor/internals/TaskExecutor.java | 2 +-
19 files changed, 47 insertions(+), 46 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 70ffc0ec1bc..a4ebf1648ef 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -37,7 +37,7 @@ import java.util.zip.GZIPOutputStream;
* The compression type to use
*/
public enum CompressionType {
- NONE(0, "none", 1.0f) {
+ NONE((byte) 0, "none", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte
messageVersion) {
return buffer;
@@ -50,7 +50,7 @@ public enum CompressionType {
},
// Shipped with the JDK
- GZIP(1, "gzip", 1.0f) {
+ GZIP((byte) 1, "gzip", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte
messageVersion) {
try {
@@ -92,7 +92,7 @@ public enum CompressionType {
// To ensure this, we only reference compression library code from classes
that are only invoked when actual usage
// happens.
- SNAPPY(2, "snappy", 1.0f) {
+ SNAPPY((byte) 2, "snappy", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte
messageVersion) {
return SnappyFactory.wrapForOutput(buffer);
@@ -114,7 +114,7 @@ public enum CompressionType {
}
},
- LZ4(3, "lz4", 1.0f) {
+ LZ4((byte) 3, "lz4", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte
messageVersion) {
try {
@@ -144,7 +144,7 @@ public enum CompressionType {
}
},
- ZSTD(4, "zstd", 1.0f) {
+ ZSTD((byte) 4, "zstd", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte
messageVersion) {
return ZstdFactory.wrapForOutput(buffer);
@@ -169,11 +169,13 @@ public enum CompressionType {
};
- public final int id;
+ // compression type is represented by two bits in the attributes field of
the record batch header, so `byte` is
+ // large enough
+ public final byte id;
public final String name;
public final float rate;
- CompressionType(int id, String name, float rate) {
+ CompressionType(byte id, String name, float rate) {
this.id = id;
this.name = name;
this.rate = rate;
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index ee2ef764728..f10fb246c64 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -431,7 +431,7 @@ public class DefaultRecord implements Record {
// Starting JDK 12, this implementation could be replaced by
InputStream#skipNBytes
while (bytesToSkip > 0) {
- long ns = in.skip(bytesToSkip);
+ int ns = (int) in.skip(bytesToSkip);
if (ns > 0 && ns <= bytesToSkip) {
// adjust number to skip
bytesToSkip -= ns;
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index a671ac18477..b1b8a2ad6a9 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -428,7 +428,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch
implements MutableRe
if (isControl)
attributes |= CONTROL_FLAG_MASK;
if (type.id > 0)
- attributes |= COMPRESSION_CODEC_MASK & type.id;
+ attributes |= (byte) (COMPRESSION_CODEC_MASK & type.id);
if (timestampType == TimestampType.LOG_APPEND_TIME)
attributes |= TIMESTAMP_TYPE_MASK;
if (isDeleteHorizonSet)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java
index bbb17d4b460..493df189e0f 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java
@@ -30,7 +30,7 @@ public class DefaultRecordsSend<T extends
TransferableRecords> extends RecordsSe
}
@Override
- protected long writeTo(TransferableChannel channel, long
previouslyWritten, int remaining) throws IOException {
+ protected int writeTo(TransferableChannel channel, int previouslyWritten,
int remaining) throws IOException {
return records().writeTo(channel, previouslyWritten, remaining);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 17a41e2a744..6ff9b390965 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -291,7 +291,7 @@ public class FileRecords extends AbstractRecords implements
Closeable {
}
@Override
- public long writeTo(TransferableChannel destChannel, long offset, int
length) throws IOException {
+ public int writeTo(TransferableChannel destChannel, int offset, int
length) throws IOException {
long newSize = Math.min(channel.size(), end) - start;
int oldSize = sizeInBytes();
if (newSize < oldSize)
@@ -300,8 +300,9 @@ public class FileRecords extends AbstractRecords implements
Closeable {
file.getAbsolutePath(), oldSize, newSize));
long position = start + offset;
- long count = Math.min(length, oldSize - offset);
- return destChannel.transferFrom(channel, position, count);
+ int count = Math.min(length, oldSize - offset);
+ // safe to cast to int since `count` is an int
+ return (int) destChannel.transferFrom(channel, position, count);
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
index 01176518457..f5f8dcecb67 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
@@ -67,7 +67,7 @@ public final class LazyDownConversionRecordsSend extends
RecordsSend<LazyDownCon
}
@Override
- public long writeTo(TransferableChannel channel, long previouslyWritten,
int remaining) throws IOException {
+ public int writeTo(TransferableChannel channel, int previouslyWritten, int
remaining) throws IOException {
if (convertedRecordsWriter == null ||
convertedRecordsWriter.completed()) {
MemoryRecords convertedRecords;
@@ -93,7 +93,8 @@ public final class LazyDownConversionRecordsSend extends
RecordsSend<LazyDownCon
convertedRecordsWriter = new
DefaultRecordsSend<>(convertedRecords, Math.min(convertedRecords.sizeInBytes(),
remaining));
}
- return convertedRecordsWriter.writeTo(channel);
+ // safe to cast to int since `remaining` is an int
+ return (int) convertedRecordsWriter.writeTo(channel);
}
public RecordConversionStats recordConversionStats() {
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
index eb852f5df71..f016cbcbc76 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
@@ -76,7 +76,7 @@ public final class LegacyRecord {
* Specifies the mask for the compression code. 3 bits to hold the
compression codec. 0 is reserved to indicate no
* compression
*/
- private static final int COMPRESSION_CODEC_MASK = 0x07;
+ private static final byte COMPRESSION_CODEC_MASK = 0x07;
/**
* Specify the mask of timestamp type: 0 for CreateTime, 1 for
LogAppendTime.
@@ -497,7 +497,7 @@ public final class LegacyRecord {
public static byte computeAttributes(byte magic, CompressionType type,
TimestampType timestampType) {
byte attributes = 0;
if (type.id > 0)
- attributes |= COMPRESSION_CODEC_MASK & type.id;
+ attributes |= (byte) (COMPRESSION_CODEC_MASK & type.id);
if (magic > RecordBatch.MAGIC_VALUE_V0) {
if (timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("Timestamp type must be
provided to compute attributes for " +
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index eacc2113b00..fa18a88ca79 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -69,14 +69,12 @@ public class MemoryRecords extends AbstractRecords {
}
@Override
- public long writeTo(TransferableChannel channel, long position, int
length) throws IOException {
- if (position > Integer.MAX_VALUE)
- throw new IllegalArgumentException("position should not be greater
than Integer.MAX_VALUE: " + position);
- if (position + length > buffer.limit())
+ public int writeTo(TransferableChannel channel, int position, int length)
throws IOException {
+ if (((long) position) + length > buffer.limit())
throw new IllegalArgumentException("position+length should not be
greater than buffer.limit(), position: "
+ position + ", length: " + length + ", buffer.limit(): "
+ buffer.limit());
- return Utils.tryWriteTo(channel, (int) position, length, buffer);
+ return Utils.tryWriteTo(channel, position, length, buffer);
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
index 22883b278a3..e12cc58e00e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
@@ -87,7 +87,7 @@ public class MultiRecordsSend implements Send {
if (completed())
throw new KafkaException("This operation cannot be invoked on a
complete request.");
- int totalWrittenPerCall = 0;
+ long totalWrittenPerCall = 0;
boolean sendComplete;
do {
long written = current.writeTo(channel);
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java
b/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java
index b582ec2d461..eb6e1b2ce74 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java
@@ -44,10 +44,10 @@ public abstract class RecordsSend<T extends BaseRecords>
implements Send {
@Override
public final long writeTo(TransferableChannel channel) throws IOException {
- long written = 0;
+ int written = 0;
if (remaining > 0) {
- written = writeTo(channel, size() - remaining, remaining);
+ written = writeTo(channel, maxBytesToWrite - remaining, remaining);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This
shouldn't happen.");
remaining -= written;
@@ -75,10 +75,10 @@ public abstract class RecordsSend<T extends BaseRecords>
implements Send {
* the to maximum bytes we want to write the to `channel`.
`previouslyWritten` and `remaining` will be adjusted
* appropriately for every subsequent invocation. See {@link #writeTo} for
example expected usage.
* @param channel The channel to write to
- * @param previouslyWritten Bytes written in previous calls to {@link
#writeTo(TransferableChannel, long, int)}; 0 if being called for the first time
+ * @param previouslyWritten Bytes written in previous calls to {@link
#writeTo(TransferableChannel, int, int)}; 0 if being called for the first time
* @param remaining Number of bytes remaining to be written
* @return The number of bytes actually written
* @throws IOException For any IO errors
*/
- protected abstract long writeTo(TransferableChannel channel, long
previouslyWritten, int remaining) throws IOException;
+ protected abstract int writeTo(TransferableChannel channel, int
previouslyWritten, int remaining) throws IOException;
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/TransferableRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/TransferableRecords.java
index 09c0304a0c2..c0b3c0a8823 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/TransferableRecords.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/TransferableRecords.java
@@ -35,5 +35,5 @@ public interface TransferableRecords extends BaseRecords {
* @return The number of bytes actually written
* @throws IOException For any IO errors
*/
- long writeTo(TransferableChannel channel, long position, int length)
throws IOException;
+ int writeTo(TransferableChannel channel, int position, int length) throws
IOException;
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/UnalignedFileRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/UnalignedFileRecords.java
index 96970f992bb..57f3d2b358b 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/UnalignedFileRecords.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/UnalignedFileRecords.java
@@ -42,9 +42,10 @@ public class UnalignedFileRecords implements
UnalignedRecords {
}
@Override
- public long writeTo(TransferableChannel destChannel, long
previouslyWritten, int remaining) throws IOException {
+ public int writeTo(TransferableChannel destChannel, int previouslyWritten,
int remaining) throws IOException {
long position = this.position + previouslyWritten;
- long count = Math.min(remaining, sizeInBytes() - previouslyWritten);
- return destChannel.transferFrom(channel, position, count);
+ int count = Math.min(remaining, sizeInBytes() - previouslyWritten);
+ // safe to cast to int since `count` is an int
+ return (int) destChannel.transferFrom(channel, position, count);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/UnalignedMemoryRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/UnalignedMemoryRecords.java
index 23795e30648..ee37bb43b4d 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/UnalignedMemoryRecords.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/UnalignedMemoryRecords.java
@@ -44,13 +44,11 @@ public class UnalignedMemoryRecords implements
UnalignedRecords {
}
@Override
- public long writeTo(TransferableChannel channel, long position, int
length) throws IOException {
- if (position > Integer.MAX_VALUE)
- throw new IllegalArgumentException("position should not be greater
than Integer.MAX_VALUE: " + position);
- if (position + length > buffer.limit())
+ public int writeTo(TransferableChannel channel, int position, int length)
throws IOException {
+ if (((long) position) + length > buffer.limit())
throw new IllegalArgumentException("position+length should not be
greater than buffer.limit(), position: "
+ position + ", length: " + length + ", buffer.limit(): "
+ buffer.limit());
- return Utils.tryWriteTo(channel, (int) position, length, buffer);
+ return Utils.tryWriteTo(channel, position, length, buffer);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
index 42924fb77af..3bca2c977cb 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
@@ -34,7 +34,7 @@ public class ShortDeserializer implements Deserializer<Short>
{
short value = 0;
for (byte b : data) {
value <<= 8;
- value |= b & 0xFF;
+ value |= (short) (b & 0xFF);
}
return value;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index cec407fcbd8..ada1cafed49 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -1262,7 +1262,7 @@ public final class Utils {
* @return The length of the actual written data
* @throws IOException If an I/O error occurs
*/
- public static long tryWriteTo(TransferableChannel destChannel,
+ public static int tryWriteTo(TransferableChannel destChannel,
int position,
int length,
ByteBuffer sourceBuffer) throws IOException {
diff --git
a/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java
b/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java
index c3692fd112f..7c83ec79ce0 100644
--- a/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java
+++ b/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java
@@ -306,7 +306,7 @@ public class KafkaLZ4Test {
args.ignoreFlagDescriptorChecksum);
int n = 100;
- int remaining = args.payload.length;
+ long remaining = args.payload.length;
long skipped = in.skip(n);
assertEquals(Math.min(n, remaining), skipped);
diff --git
a/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
b/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
index 8d33e6176ae..1d88ba24bc4 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
@@ -50,7 +50,7 @@ public class MeterTest {
double nextValue = 0.0;
double expectedTotal = 0.0;
long now = 0;
- double intervalMs = 100;
+ int intervalMs = 100;
double delta = 5.0;
// Record values in multiple windows and verify that rates are reported
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 2fa978e10fa..74b7e2ff137 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -536,19 +536,19 @@ public class FileRecordsTest {
public void testBytesLengthOfWriteTo() throws IOException {
int size = fileRecords.sizeInBytes();
- long firstWritten = size / 3;
+ int firstWritten = size / 3;
TransferableChannel channel = Mockito.mock(TransferableChannel.class);
// Firstly we wrote some of the data
- fileRecords.writeTo(channel, 0, (int) firstWritten);
- verify(channel).transferFrom(any(), anyLong(), eq(firstWritten));
+ fileRecords.writeTo(channel, 0, firstWritten);
+ verify(channel).transferFrom(any(), anyLong(), eq((long)
firstWritten));
// Ensure (length > size - firstWritten)
- int secondWrittenLength = size - (int) firstWritten + 1;
+ int secondWrittenLength = size - firstWritten + 1;
fileRecords.writeTo(channel, firstWritten, secondWrittenLength);
// But we still only write (size - firstWritten), which is not
fulfilled in the old version
- verify(channel).transferFrom(any(), anyLong(), eq(size -
firstWritten));
+ verify(channel).transferFrom(any(), anyLong(), eq((long) size -
firstWritten));
}
private void doTestConversion(CompressionType compressionType, byte
toMagic) throws IOException {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index 20c7316c4c1..56359676718 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -88,7 +88,7 @@ public class TaskExecutor {
return totalProcessed;
}
- private long processTask(final Task task, final int maxNumRecords, final
long begin, final Time time) {
+ private int processTask(final Task task, final int maxNumRecords, final
long begin, final Time time) {
int processed = 0;
long now = begin;