[ https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17778731#comment-17778731 ]
Matthias J. Sax commented on KAFKA-15602: ----------------------------------------- Btw: also left a comment on K4852: https://issues.apache.org/jira/browse/KAFKA-4852 > Breaking change in 3.4.0 ByteBufferSerializer > --------------------------------------------- > > Key: KAFKA-15602 > URL: https://issues.apache.org/jira/browse/KAFKA-15602 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1 > Reporter: Luke Kirby > Priority: Critical > > [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have > solved the situation described by KAFKA-4852, namely, to have > ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 > offsets (or, put another way, to honor the buffer's position() as the start > point to consume bytes from). Unfortunately, it failed to actually do this, > and instead changed the expectations for how an input ByteBuffer's limit and > position should be set before being provided to send() on a producer > configured with ByteBufferSerializer. Code that worked with pre-3.4.0 > releases now produce 0-length messages instead of the intended messages, > effectively introducing a breaking change for existing users of the > serializer in the wild. > Here are a few different inputs and serialized outputs under pre-3.4.0 and > 3.4.0+ to summarize the breaking change: > ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output|| > |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 > val=test|len=4 val=test| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 > val=test|len=0 val=| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 > val=test<0><0><0><0>|len=4 val=test| > |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8)); > buff.limit(buff.position());|len=4 > val=test|len=4 val=test| > |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t| > Notably, plain-wrappers of byte arrays continue to work under both versions > due to the special case in the serializer for them. I suspect that this is > the dominant use-case, which is why this has apparently gone un-reported to > this point. The wrapped-with-offset case fails for both cases for different > reasons (the expected value would be "est"). As demonstrated here, you can > ensure that a manually assembled ByteBuffer will work under both versions by > ensuring that your buffers start have position == limit == message-length > (and an actual desired start position of 0). Clearly, though, behavior has > changed dramatically for the second and third case there, with the 3.3.2 > behavior, in my experience, aligning better with naive expectations. > [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java], > the serializer would just rewind() the buffer and respect the limit as the > indicator as to how much data was in the buffer. So, essentially, the > prevailing contract was that the data from position 0 (always!) up to the > limit on the buffer would be serialized; so it was really just the limit that > was honored. So if, per the original issue, you have a byte[] array wrapped > with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() > with position = 3 indicating the desired start point to read from, but > effectively ignored by the serializer due to the rewind(). > So while the serializer didn't work when presenting a ByteBuffer view onto a > sub-view of a backing array, it did however follow expected behavior when > employing standard patterns to populate ByteBuffers backed by > larger-than-necessary arrays and using limit() to identify the end of actual > data, consistent with conventional usage of flip() to switch from writing to > a buffer to setting it up to be read from (e.g., to be passed into a > producer.send() call). E.g., > {code:java} > ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH); > ... // some sequence of > bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH > ... > bb.flip(); /* logically, this says "I am done writing, let's set this up for > reading"; pragmatically, it sets the limit to the current position so that > whoever reads the buffer knows when to stop reading, and sets the position to > zero so it knows where to start reading from */ > producer.send(bb); {code} > Technically, you wouldn't even need to use flip() there, since position is > ignored; it would sufficient to just call {{{}bb.limit(bb.position()){}}}. > Notably, a buffer constructed using any variant of ByteBuffer.wrap() is > essentially immediately in read-mode with position indicating the start and > limit the end. > With the change introduced in 3.4.0, however, the contract changes > dramatically, and the code just presented produces a 0-byte message. As > indicated above, it also continues to fail if you just passed in an > offset-specified ByteBuffer.wrap()ped message, too, i.e., the case described > by KAFKA-4852: > {code:java} > @Test > public void testByteBufferSerializerOnOffsetWrappedBytes() { > final byte[] bytes = "Hello".getBytes(UTF_8); > try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) { > assertArrayEquals("ello".getBytes(UTF_8), > // FAILS: this will yield "H", not "ello" > serializer.serialize(topic, ByteBuffer.wrap(bytes, 1, > bytes.length - 1))); > } > } > {code} > What happened here? > The resulting PR, it seems, focussed on a flawed proposed test case in the > first comment of KAFKA-4852 that failed against pre-3.4.0 Kafka. I reproduce > that here with commented annotations from me: > {code:java} > @Test // flawed proposed test case > public void testByteBufferSerializer() { > final byte[] bytes = "Hello".getBytes(UTF_8); > final ByteBuffer buffer = ByteBuffer.allocate(7); > buffer.put(bytes); > // buffer.flip(); <-- would make the test work > try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) { > assertArrayEquals(bytes, serializer.serialize(topic, buffer)); > } > } {code} > In pre-3.4.0, this would yield a 7-byte serialization of "Hello" followed by > 2 0-value bytes. I contend that this was actually expected and correct > behavior, as the ByteBuffer had never had its limit set, so the implicit and > mildly expected contract was never actually abided by. If there was a > buffer.flip() after the .put(bytes) call, as the calling code _should_ have > applied, however, then the test would have succeeded. In short, by trying to > make this test case succeed, I think this PR represented nothing more than a > misunderstanding for how one should prepare ByteBuffers for reading, but has > managed to result in a breaking change. The breaking nature of this was > actually briefly noted in PR comments but kind of shrugged off with some test > changes and explanatory comments on the class. > Obviously a correction to restore 3.3.2 behavior would represent another > breaking change for users that are running on 3.4+, unless they were also > somewhat surprisingly configuring buffers for position() == limit() before > passing them to send. Arguably, it would also be a breaking change (though > possibly not one of great consequence) if either version was changed to > correctly handle the wrapped-with-offset case as represented in the original > ticket. > I do not have much experience contending with a situation like this, but at > the risk of jumping to a solution here, I wonder if the only way to really > move forward safely and unambiguously here is to remove ByteBufferSerializer > as it stands and replace it with a differently named substitute that handles > both the plain-wrapped special case and just serializes content from > position() to limit(), forcing an evaluation by users when upgrading as to > whether the provided byte buffer is correctly configured or not. Of course, a > change like that would have be released at an appropriate version level, too, > so I don't know exactly what the desired interim behavior would be > (deprecation?). I believe I would be eager to contribute to a fix, but > obviously I would need guidance from maintainers regarding the correct path > forward semantically. -- This message was sent by Atlassian Jira (v8.20.10#820010)