wcarlson5 commented on code in PR #13942:
URL: https://github.com/apache/kafka/pull/13942#discussion_r1259980518


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java:
##########
@@ -64,6 +70,16 @@ public void writeToTopology(final InternalTopologyBuilder 
topologyBuilder) {
         // Steam - KTable join only
         if (otherJoinSideNodeName != null) {
             topologyBuilder.connectProcessorAndStateStores(processorName, 
storeNames);
+            if (gracePeriod != null) {
+                for (final String storeName : storeNames) {
+                    if (!topologyBuilder.isStoreVersioned(storeName)) {
+                        throw new IllegalArgumentException("KTable must be 
versioned to use a grace period in a stream table join.");
+                    }
+                    if (gracePeriod.toMillis() > 
topologyBuilder.getHistoryRetention(storeName)) {
+                        throw new IllegalArgumentException("History history 
retention must be at least grace period, but it should be larger.");

Review Comment:
   This change is fine, just equal is enough, if not recommended



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to