mjsax commented on code in PR #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r1058656046


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##########
@@ -45,34 +45,30 @@ public void setIfUnset(final Serializer<Void> 
defaultKeySerializer, final Serial
     }
 
     /**
-     * @throws StreamsException if both old and new values of data are null, 
or if
-     * both values are not null
+     * @throws StreamsException if both old and new values of data are null.
      */
     @Override
     public byte[] serialize(final String topic, final Headers headers, final 
Change<T> data) {
-        final byte[] serializedKey;
+        final boolean oldValueIsNull = data.oldValue == null;
+        final boolean newValueIsNull = data.newValue == null;
 
-        // only one of the old / new values would be not null
-        if (data.newValue != null) {
-            if (data.oldValue != null) {
-                throw new StreamsException("Both old and new values are not 
null (" + data.oldValue
-                    + " : " + data.newValue + ") in ChangeSerializer, which is 
not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.newValue);
+        // both old and new values cannot be null
+        if (oldValueIsNull && newValueIsNull) {
+            throw new StreamsException("Both old and new values are null in 
ChangeSerializer, which is not allowed.");
         } else {
-            if (data.oldValue == null) {
-                throw new StreamsException("Both old and new values are null 
in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.oldValue);
+            final byte[] newData = newValueIsNull ? new byte[0] : 
inner.serialize(topic, headers, data.newValue);
+            final byte[] oldData = oldValueIsNull ? new byte[0] : 
inner.serialize(topic, headers, data.oldValue);

Review Comment:
   ```suggestion
               final byte[] oldData = oldValueIsNull ? null : 
inner.serialize(topic, headers, data.oldValue);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##########
@@ -45,34 +45,30 @@ public void setIfUnset(final Serializer<Void> 
defaultKeySerializer, final Serial
     }
 
     /**
-     * @throws StreamsException if both old and new values of data are null, 
or if
-     * both values are not null
+     * @throws StreamsException if both old and new values of data are null.
      */
     @Override
     public byte[] serialize(final String topic, final Headers headers, final 
Change<T> data) {
-        final byte[] serializedKey;
+        final boolean oldValueIsNull = data.oldValue == null;
+        final boolean newValueIsNull = data.newValue == null;
 
-        // only one of the old / new values would be not null
-        if (data.newValue != null) {
-            if (data.oldValue != null) {
-                throw new StreamsException("Both old and new values are not 
null (" + data.oldValue
-                    + " : " + data.newValue + ") in ChangeSerializer, which is 
not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.newValue);
+        // both old and new values cannot be null
+        if (oldValueIsNull && newValueIsNull) {
+            throw new StreamsException("Both old and new values are null in 
ChangeSerializer, which is not allowed.");
         } else {
-            if (data.oldValue == null) {
-                throw new StreamsException("Both old and new values are null 
in ChangeSerializer, which is not allowed.");
-            }
-
-            serializedKey = inner.serialize(topic, headers, data.oldValue);
+            final byte[] newData = newValueIsNull ? new byte[0] : 
inner.serialize(topic, headers, data.newValue);

Review Comment:
   ```suggestion
               final byte[] newData = newValueIsNull ? null : 
inner.serialize(topic, headers, data.newValue);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java:
##########
@@ -90,14 +90,19 @@ public void process(final K key, final Change<V> change) {
 
             // if the selected repartition key or value is null, skip
             // forward oldPair first, to be consistent with reduce and 
aggregate
-            if (oldPair != null && oldPair.key != null && oldPair.value != 
null) {
-                context().forward(oldPair.key, new Change<>(null, 
oldPair.value));
+            final boolean oldPairNotNull = oldPair != null && oldPair.key != 
null && oldPair.value != null;
+            final boolean newPairNotNull = newPair != null && newPair.key != 
null && newPair.value != null;
+            if (oldPairNotNull && newPairNotNull && oldPair.key == 
newPair.key) {

Review Comment:
   `oldPair.key == newPair.key` is not using `equals()` but it does an object 
reference comparison. We need to use `oldPair.key.equals(newPiar.key)` (and 
test that `oldPair.key != null` to avoid a NPE.



##########
docs/streams/developer-guide/dsl-api.html:
##########
@@ -1050,8 +1050,8 @@ <h4 class="anchor-heading"><a 
id="streams_concepts_globalktable" class="anchor-l
                                         more than once for a key as a result 
of having received input tombstone records for that key (see below).</li>
                                     <li>When the first non-<code 
class="docutils literal"><span class="pre">null</span></code> value is received 
for a key (e.g.,  INSERT), then only the adder is called.</li>
                                     <li>When subsequent non-<code 
class="docutils literal"><span class="pre">null</span></code> values are 
received for a key (e.g.,  UPDATE), then (1) the subtractor is
-                                        called with the old value as stored in 
the table and (2) the adder is called with the new value of the
-                                        input record that was just received.  
The order of execution for the subtractor and adder is not defined.</li>
+                                        first called with the old value as 
stored in the table and then (2) the adder is called with the new value of the
+                                        input record that was just 
received.</li>

Review Comment:
   Well, for the regular case in which the key changes, the order is still not 
defined...
   
   We would need to be more precise and say:
   > If and only if the extracted grouping key of the old and new value is the 
same, the subtractor will be called before the adder. The detection of this 
case depends on the correct implementation of the <code>equals()</code> method 
of the extracted key type.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java:
##########
@@ -45,16 +46,17 @@ public void setIfUnset(final Deserializer<Void> 
defaultKeyDeserializer, final De
 
     @Override
     public Change<T> deserialize(final String topic, final Headers headers, 
final byte[] data) {
+        final ByteBuffer buffer = 
Serdes.ByteBuffer().deserializer().deserialize(topic, data);
+        final int newDataLength = buffer.getInt();
+        final int oldDataLength = data.length - newDataLength - 
NEW_DATA_LENGTH_BYTES_SIZE;
+        final byte[] newData = new byte[newDataLength];
+        final byte[] oldData = new byte[oldDataLength];
+        buffer.get(newData);
+        buffer.get(oldData);
 
-        final byte[] bytes = new byte[data.length - NEWFLAG_SIZE];
-
-        System.arraycopy(data, 0, bytes, 0, bytes.length);
-
-        if (ByteBuffer.wrap(data).get(data.length - NEWFLAG_SIZE) != 0) {
-            return new Change<>(inner.deserialize(topic, headers, bytes), 
null);
-        } else {
-            return new Change<>(null, inner.deserialize(topic, headers, 
bytes));
-        }
+        return new Change<>(
+                newDataLength > 0 ? inner.deserialize(topic, headers, newData) 
: null,

Review Comment:
   And empty BytesBuffer and a `null` is not the same, and we should 
distinguish between both. Cf my other comment in the serializer.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java:
##########
@@ -18,13 +18,14 @@
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
 
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
 public class ChangedDeserializer<T> implements Deserializer<Change<T>>, 
WrappingNullableDeserializer<Change<T>, Void, T> {
 
-    private static final int NEWFLAG_SIZE = 1;
+    private static final int NEW_DATA_LENGTH_BYTES_SIZE = Integer.BYTES;

Review Comment:
   Why do we rename this? (I am just wondering what this is actually...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to