wcarlson5 commented on code in PR #14174:
URL: https://github.com/apache/kafka/pull/14174#discussion_r1347987570


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java:
##########
@@ -51,8 +51,8 @@ public void init(final ProcessorContext<K, V> context) {
         public void process(final Record<K, V> record) {
             // if the key is null, we do not need to put the record into 
window store
             // since it will never be considered for join operations
+            context().forward(record);
             if (record.key() != null) {
-                context().forward(record);
                 // Every record basically starts a new window. We're using a 
window store mostly for the retention.
                 window.put(record.key(), record.value(), record.timestamp());

Review Comment:
   why should null keys not enter the window?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java:
##########
@@ -39,22 +39,26 @@ public static <KIn, VIn, KOut, VOut> boolean skipRecord(
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
         if (record.key() == null || record.value() == null) {
-            if (context.recordMetadata().isPresent()) {
-                final RecordMetadata recordMetadata = 
context.recordMetadata().get();
-                logger.warn(
-                    "Skipping record due to null key or value. "
-                        + "topic=[{}] partition=[{}] offset=[{}]",
-                    recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
-                );
-            } else {
-                logger.warn(
-                    "Skipping record due to null key or value. Topic, 
partition, and offset not known."
-                );
-            }
-            droppedRecordsSensor.record();
+            dropRecord(logger, droppedRecordsSensor, context);
             return true;
         } else {
             return false;
         }
     }
+
+    public static <KOut, VOut> void dropRecord(final Logger logger, final 
Sensor droppedRecordsSensor, final ProcessorContext<KOut, VOut> context) {

Review Comment:
   I'm not a huge fan of splitting this out to a separate public method. I 
think you can just reuse the logic in skip record.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -124,17 +124,20 @@ public void init(final ProcessorContext<K, VOut> context) 
{
         @SuppressWarnings("unchecked")
         @Override
         public void process(final Record<K, V1> record) {
-            if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
-                return;
-            }
-            boolean needOuterJoin = outer;
-
             final long inputRecordTimestamp = record.timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
-
             sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
 
+            if (outer && record.key() == null && record.value() != null) {

Review Comment:
   what about inner left joins? Those values go into the window? Why?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -351,8 +353,23 @@ private void optimizeTopology(final Properties props) {
             LOG.debug("Optimizing the Kafka Streams graph for self-joins");
             rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>());
         }
+        LOG.debug("Optimizing the Kafka Streams graph for null-key records");
+        rewriteRepartitionNodes();
     }
 
+    private void rewriteRepartitionNodes() {

Review Comment:
   This is to prevent null keys to go into reparation topics, right? Will that 
effect results if a manual reparation is added?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -124,17 +124,20 @@ public void init(final ProcessorContext<K, VOut> context) 
{
         @SuppressWarnings("unchecked")
         @Override
         public void process(final Record<K, V1> record) {
-            if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
-                return;
-            }
-            boolean needOuterJoin = outer;
-
             final long inputRecordTimestamp = record.timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
-
             sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
 
+            if (outer && record.key() == null && record.value() != null) {
+                context().forward(record.withValue(joiner.apply(record.key(), 
record.value(), null)));
+                return;
+            } else if (record.key() == null || record.value() == null) {

Review Comment:
   This is the check already in skip record, can you use that instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to