This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1f5aea2a864 MINOR: remove get prefix for internal DSL methods (#17050)
1f5aea2a864 is described below

commit 1f5aea2a864e873dd4c0207d8039fe3fd24c66a5
Author: Matthias J. Sax <[email protected]>
AuthorDate: Sun Sep 1 02:14:51 2024 -0700

    MINOR: remove get prefix for internal DSL methods (#17050)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kstream/internals/InternalStreamsBuilder.java  |  8 +--
 .../kstream/internals/KStreamKStreamJoin.java      |  8 +--
 .../internals/KStreamKStreamJoinLeftSide.java      |  5 +-
 .../internals/KStreamKStreamJoinRightSide.java     |  5 +-
 .../streams/kstream/internals/KTableImpl.java      |  4 +-
 .../kstream/internals/KTableKTableJoinMerger.java  |  2 +-
 .../internals/foreignkeyjoin/CombinedKey.java      |  4 +-
 .../ForeignTableJoinProcessorSupplier.java         |  6 +--
 .../ResponseJoinProcessorSupplier.java             |  8 +--
 .../SubscriptionJoinProcessorSupplier.java         | 28 +++++-----
 .../SubscriptionReceiveProcessorSupplier.java      | 12 ++---
 .../SubscriptionResponseWrapper.java               |  8 +--
 .../SubscriptionResponseWrapperSerde.java          | 10 ++--
 .../foreignkeyjoin/SubscriptionWrapper.java        | 12 ++---
 .../foreignkeyjoin/SubscriptionWrapperSerde.java   | 20 ++++----
 .../internals/graph/BaseRepartitionNode.java       |  2 +-
 .../internals/graph/KTableKTableJoinNode.java      |  2 +-
 .../internals/graph/StreamStreamJoinNode.java      |  4 +-
 .../internals/suppress/BufferConfigInternal.java   |  8 +--
 .../internals/suppress/EagerBufferConfigImpl.java  |  8 +--
 .../internals/suppress/StrictBufferConfigImpl.java | 12 ++---
 .../foreignkeyjoin/CombinedKeySchemaTest.java      |  4 +-
 .../SubscriptionJoinProcessorSupplierTest.java     | 16 +++---
 .../SubscriptionResponseWrapperSerdeTest.java      | 24 ++++-----
 .../SubscriptionWrapperSerdeTest.java              | 60 +++++++++++-----------
 25 files changed, 139 insertions(+), 141 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 13a8d64b66d..92dde06e9c0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -364,10 +364,10 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
     private void rewriteRepartitionNodes() {
         final Set<BaseRepartitionNode<?, ?>> nodes = new 
NodesWithRelaxedNullKeyJoinDownstream(root).find();
         for (final BaseRepartitionNode<?, ?> partitionNode : nodes) {
-            if (partitionNode.getProcessorParameters() != null) {
+            if (partitionNode.processorParameters() != null) {
                 partitionNode.setProcessorParameters(new ProcessorParameters<>(
                     new KStreamFilter<>((k, v) -> k != null, false),
-                    partitionNode.getProcessorParameters().processorName()
+                    partitionNode.processorParameters().processorName()
                 ));
             }
         }
@@ -445,9 +445,9 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
             GraphNode left = null, right = null;
             for (final GraphNode child: parent.children()) {
                 if (child instanceof WindowedStreamProcessorNode && 
child.buildPriority() < joinNode.buildPriority()) {
-                    if 
(child.nodeName().equals(joinNode.getThisWindowedStreamProcessorParameters().processorName()))
 {
+                    if 
(child.nodeName().equals(joinNode.thisWindowedStreamProcessorParameters().processorName()))
 {
                         left = child;
-                    } else if 
(child.nodeName().equals(joinNode.getOtherWindowedStreamProcessorParameters().processorName()))
 {
+                    } else if 
(child.nodeName().equals(joinNode.otherWindowedStreamProcessorParameters().processorName()))
 {
                         right = child;
                     }
                 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 9ba8f316271..d3c0c4c0a22 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -170,9 +170,9 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, 
VThis, VOther> impleme
 
         protected abstract TimestampedKeyAndJoinSide<K> makeOtherKey(final K 
key, final long timestamp);
 
-        protected abstract VThis getThisValue(final LeftOrRightValue<? extends 
VLeft, ? extends VRight> leftOrRightValue);
+        protected abstract VThis thisValue(final LeftOrRightValue<? extends 
VLeft, ? extends VRight> leftOrRightValue);
 
-        protected abstract VOther getOtherValue(final LeftOrRightValue<? 
extends VLeft, ? extends VRight> leftOrRightValue);
+        protected abstract VOther otherValue(final LeftOrRightValue<? extends 
VLeft, ? extends VRight> leftOrRightValue);
 
         private void emitNonJoinedOuterRecords(final 
KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> 
store,
                                                final Record<K, VThis> record) {
@@ -252,8 +252,8 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, 
VThis, VOther> impleme
                                                   final 
LeftOrRightValue<VLeft, VRight> leftOrRightValue) {
             final K key = timestampedKeyAndJoinSide.key();
             final long timestamp = timestampedKeyAndJoinSide.timestamp();
-            final VThis thisValue = getThisValue(leftOrRightValue);
-            final VOther otherValue = getOtherValue(leftOrRightValue);
+            final VThis thisValue = thisValue(leftOrRightValue);
+            final VOther otherValue = otherValue(leftOrRightValue);
             final VOut nullJoinedValue = joiner.apply(key, thisValue, 
otherValue);
             context().forward(
                     
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
index df03e39b8c5..1a1fbdffb2d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
@@ -58,13 +58,12 @@ class KStreamKStreamJoinLeftSide<K, VLeft, VRight, VOut> 
extends KStreamKStreamJ
             return TimestampedKeyAndJoinSide.makeRight(key, timestamp);
         }
 
-        @Override
-        public VLeft getThisValue(final LeftOrRightValue<? extends VLeft, ? 
extends VRight> leftOrRightValue) {
+        public VLeft thisValue(final LeftOrRightValue<? extends VLeft, ? 
extends VRight> leftOrRightValue) {
             return leftOrRightValue.leftValue();
         }
 
         @Override
-        public VRight getOtherValue(final LeftOrRightValue<? extends VLeft, ? 
extends VRight> leftOrRightValue) {
+        public VRight otherValue(final LeftOrRightValue<? extends VLeft, ? 
extends VRight> leftOrRightValue) {
             return leftOrRightValue.rightValue();
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
index ec29d5f12b8..d09593f3b28 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
@@ -57,13 +57,12 @@ class KStreamKStreamJoinRightSide<K, VLeft, VRight, VOut> 
extends KStreamKStream
             return TimestampedKeyAndJoinSide.makeLeft(key, timestamp);
         }
 
-        @Override
-        public VRight getThisValue(final LeftOrRightValue<? extends VLeft, ? 
extends VRight> leftOrRightValue) {
+        public VRight thisValue(final LeftOrRightValue<? extends VLeft, ? 
extends VRight> leftOrRightValue) {
             return leftOrRightValue.rightValue();
         }
 
         @Override
-        public VLeft getOtherValue(final LeftOrRightValue<? extends VLeft, ? 
extends VRight> leftOrRightValue) {
+        public VLeft otherValue(final LeftOrRightValue<? extends VLeft, ? 
extends VRight> leftOrRightValue) {
             return leftOrRightValue.leftValue();
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index f06059f7c0a..09efdb78006 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -573,7 +573,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
         final StoreBuilder<InMemoryTimeOrderedKeyValueChangeBuffer<K, V, 
Change<V>>> storeBuilder;
 
         if (suppressedInternal.bufferConfig().isLoggingEnabled()) {
-            final Map<String, String> topicConfig = 
suppressedInternal.bufferConfig().getLogConfig();
+            final Map<String, String> topicConfig = 
suppressedInternal.bufferConfig().logConfig();
             storeBuilder = new 
InMemoryTimeOrderedKeyValueChangeBuffer.Builder<>(
                 storeName,
                 keySerde,
@@ -1165,7 +1165,7 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
 
         final StreamPartitioner<K, SubscriptionResponseWrapper<VO>> 
defaultForeignResponseSinkPartitioner =
                 (topic, key, subscriptionResponseWrapper, numPartitions) -> {
-                    final Integer partition = 
subscriptionResponseWrapper.getPrimaryPartition();
+                    final Integer partition = 
subscriptionResponseWrapper.primaryPartition();
                     return partition == null ? Optional.empty() : 
Optional.of(Collections.singleton(partition));
                 };
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 9d381635586..7924f8ea857 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -44,7 +44,7 @@ public class KTableKTableJoinMerger<K, V> implements 
KTableProcessorSupplier<K,
         this.queryableName = queryableName;
     }
 
-    public String getQueryableName() {
+    public String queryableName() {
         return queryableName;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
index 01e94da345a..016f24cb4a0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
@@ -29,11 +29,11 @@ public class CombinedKey<KF, KP> {
         this.primaryKey = primaryKey;
     }
 
-    public KF getForeignKey() {
+    public KF foreignKey() {
         return foreignKey;
     }
 
-    public KP getPrimaryKey() {
+    public KP primaryKey() {
         return primaryKey;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
index cd7f13f81e2..7c3e982a8ed 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
@@ -123,11 +123,11 @@ public class ForeignTableJoinProcessorSupplier<K, KO, VO> 
implements
                     if (prefixEquals(next.key.get(), prefixBytes.get())) {
                         final CombinedKey<KO, K> combinedKey = 
keySchema.fromBytes(next.key);
                         context().forward(
-                            record.withKey(combinedKey.getPrimaryKey())
+                            record.withKey(combinedKey.primaryKey())
                                 .withValue(new SubscriptionResponseWrapper<>(
-                                    next.value.value().getHash(),
+                                    next.value.value().hash(),
                                     record.value().newValue,
-                                    next.value.value().getPrimaryPartition()))
+                                    next.value.value().primaryPartition()))
                         );
                     }
                 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
index 6a55d3e85f6..50c2b356344 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
@@ -99,7 +99,7 @@ public class ResponseJoinProcessorSupplier<K, V, VO, VR> 
implements ProcessorSup
 
             @Override
             public void process(final Record<K, 
SubscriptionResponseWrapper<VO>> record) {
-                if (record.value().getVersion() != 
SubscriptionResponseWrapper.CURRENT_VERSION) {
+                if (record.value().version() != 
SubscriptionResponseWrapper.CURRENT_VERSION) {
                     //Guard against modifications to 
SubscriptionResponseWrapper. Need to ensure that there is
                     //compatibility with previous versions to enable rolling 
upgrades. Must develop a strategy for
                     //upgrading from older SubscriptionWrapper versions to 
newer versions.
@@ -111,16 +111,16 @@ public class ResponseJoinProcessorSupplier<K, V, VO, VR> 
implements ProcessorSup
                     null :
                     
Murmur3.hash128(runtimeValueSerializer.serialize(valueHashSerdePseudoTopic, 
currentValueWithTimestamp.value()));
 
-                final long[] messageHash = 
record.value().getOriginalValueHash();
+                final long[] messageHash = record.value().originalValueHash();
 
                 //If this value doesn't match the current value from the 
original table, it is stale and should be discarded.
                 if (java.util.Arrays.equals(messageHash, currentHash)) {
                     final VR result;
 
-                    if (record.value().getForeignValue() == null && (!leftJoin 
|| currentValueWithTimestamp == null)) {
+                    if (record.value().foreignValue() == null && (!leftJoin || 
currentValueWithTimestamp == null)) {
                         result = null; //Emit tombstone
                     } else {
-                        result = joiner.apply(currentValueWithTimestamp == 
null ? null : currentValueWithTimestamp.value(), 
record.value().getForeignValue());
+                        result = joiner.apply(currentValueWithTimestamp == 
null ? null : currentValueWithTimestamp.value(), record.value().foreignValue());
                     }
                     context().forward(record.withValue(result));
                 } else {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
index 388b669e988..008ef665817 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
@@ -70,7 +70,7 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
                 Objects.requireNonNull(valueAndTimestamp, "This processor 
should never see a null newValue.");
                 final SubscriptionWrapper<K> value = valueAndTimestamp.value();
 
-                if (value.getVersion() > SubscriptionWrapper.CURRENT_VERSION) {
+                if (value.version() > SubscriptionWrapper.CURRENT_VERSION) {
                     //Guard against modifications to SubscriptionWrapper. Need 
to ensure that there is compatibility
                     //with previous versions to enable rolling upgrades. Must 
develop a strategy for upgrading
                     //from older SubscriptionWrapper versions to newer 
versions.
@@ -78,23 +78,23 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
                 }
 
                 final ValueAndTimestamp<VO> foreignValueAndTime =
-                    record.key().getForeignKey() == null ?
+                    record.key().foreignKey() == null ?
                         null :
-                        foreignValues.get(record.key().getForeignKey());
+                        foreignValues.get(record.key().foreignKey());
 
                 final long resultTimestamp =
                     foreignValueAndTime == null ?
                         valueAndTimestamp.timestamp() :
                         Math.max(valueAndTimestamp.timestamp(), 
foreignValueAndTime.timestamp());
 
-                switch (value.getInstruction()) {
+                switch (value.instruction()) {
                     case DELETE_KEY_AND_PROPAGATE:
                         context().forward(
-                            record.withKey(record.key().getPrimaryKey())
+                            record.withKey(record.key().primaryKey())
                                 .withValue(new SubscriptionResponseWrapper<VO>(
-                                    value.getHash(),
+                                    value.hash(),
                                     null,
-                                    value.getPrimaryPartition()
+                                    value.primaryPartition()
                                 ))
                                 .withTimestamp(resultTimestamp)
                         );
@@ -106,11 +106,11 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
                         final VO valueToSend = foreignValueAndTime == null ? 
null : foreignValueAndTime.value();
 
                         context().forward(
-                            record.withKey(record.key().getPrimaryKey())
+                            record.withKey(record.key().primaryKey())
                                 .withValue(new SubscriptionResponseWrapper<>(
-                                        value.getHash(),
+                                        value.hash(),
                                         valueToSend,
-                                        value.getPrimaryPartition()
+                                        value.primaryPartition()
                                 ))
                                 .withTimestamp(resultTimestamp)
                         );
@@ -118,11 +118,11 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
                     case PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE:
                         if (foreignValueAndTime != null) {
                             context().forward(
-                                record.withKey(record.key().getPrimaryKey())
+                                record.withKey(record.key().primaryKey())
                                    .withValue(new 
SubscriptionResponseWrapper<>(
-                                       value.getHash(),
+                                       value.hash(),
                                        foreignValueAndTime.value(),
-                                       value.getPrimaryPartition()
+                                       value.primaryPartition()
                                    ))
                                    .withTimestamp(resultTimestamp)
                             );
@@ -131,7 +131,7 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
                     case DELETE_KEY_NO_PROPAGATE:
                         break;
                     default:
-                        throw new IllegalStateException("Unhandled 
instruction: " + value.getInstruction());
+                        throw new IllegalStateException("Unhandled 
instruction: " + value.instruction());
                 }
             }
         };
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
index a00ed777c85..a935797ad18 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
@@ -75,18 +75,18 @@ public class SubscriptionReceiveProcessorSupplier<K, KO>
 
             @Override
             public void process(final Record<KO, SubscriptionWrapper<K>> 
record) {
-                if (record.key() == null && 
!SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().getInstruction()))
 {
+                if (record.key() == null && 
!SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction()))
 {
                     dropRecord();
                     return;
                 }
-                if (record.value().getVersion() > 
SubscriptionWrapper.CURRENT_VERSION) {
+                if (record.value().version() > 
SubscriptionWrapper.CURRENT_VERSION) {
                     //Guard against modifications to SubscriptionWrapper. Need 
to ensure that there is compatibility
                     //with previous versions to enable rolling upgrades. Must 
develop a strategy for upgrading
                     //from older SubscriptionWrapper versions to newer 
versions.
                     throw new UnsupportedVersionException("SubscriptionWrapper 
is of an incompatible version.");
                 }
                 context().forward(
-                    record.withKey(new CombinedKey<>(record.key(), 
record.value().getPrimaryKey()))
+                    record.withKey(new CombinedKey<>(record.key(), 
record.value().primaryKey()))
                         .withValue(inferChange(record))
                         .withTimestamp(record.timestamp())
                 );
@@ -101,14 +101,14 @@ public class SubscriptionReceiveProcessorSupplier<K, KO>
             }
 
             private Change<ValueAndTimestamp<SubscriptionWrapper<K>>> 
inferBasedOnState(final Record<KO, SubscriptionWrapper<K>> record) {
-                final Bytes subscriptionKey = keySchema.toBytes(record.key(), 
record.value().getPrimaryKey());
+                final Bytes subscriptionKey = keySchema.toBytes(record.key(), 
record.value().primaryKey());
 
                 final ValueAndTimestamp<SubscriptionWrapper<K>> newValue = 
ValueAndTimestamp.make(record.value(), record.timestamp());
                 final ValueAndTimestamp<SubscriptionWrapper<K>> oldValue = 
store.get(subscriptionKey);
 
                 //This store is used by the prefix scanner in 
ForeignTableJoinProcessorSupplier
-                if 
(record.value().getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE)
 ||
-                    
record.value().getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE))
 {
+                if 
(record.value().instruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE)
 ||
+                    
record.value().instruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE))
 {
                     store.delete(subscriptionKey);
                 } else {
                     store.put(subscriptionKey, newValue);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
index f19bf680d86..501edcd2432 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
@@ -48,19 +48,19 @@ public class SubscriptionResponseWrapper<FV> {
         this.primaryPartition = primaryPartition;
     }
 
-    public long[] getOriginalValueHash() {
+    public long[] originalValueHash() {
         return originalValueHash;
     }
 
-    public FV getForeignValue() {
+    public FV foreignValue() {
         return foreignValue;
     }
 
-    public byte getVersion() {
+    public byte version() {
         return version;
     }
 
-    public Integer getPrimaryPartition() {
+    public Integer primaryPartition() {
         return primaryPartition;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
index 12a14e7cc4d..9e143d4e296 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
@@ -67,24 +67,24 @@ public class SubscriptionResponseWrapperSerde<V> implements 
Serde<SubscriptionRe
             
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized 
data}
 
             //7-bit (0x7F) maximum for data version.
-            if (Byte.compare((byte) 0x7F, data.getVersion()) < 0) {
+            if (Byte.compare((byte) 0x7F, data.version()) < 0) {
                 throw new 
UnsupportedVersionException("SubscriptionResponseWrapper version is larger than 
maximum supported 0x7F");
             }
 
-            final byte[] serializedData = data.getForeignValue() == null ? 
null : serializer.serialize(topic, data.getForeignValue());
+            final byte[] serializedData = data.foreignValue() == null ? null : 
serializer.serialize(topic, data.foreignValue());
             final int serializedDataLength = serializedData == null ? 0 : 
serializedData.length;
-            final long[] originalHash = data.getOriginalValueHash();
+            final long[] originalHash = data.originalValueHash();
             final int hashLength = originalHash == null ? 0 : 2 * Long.BYTES;
 
             final ByteBuffer buf = ByteBuffer.allocate(1 + hashLength + 
serializedDataLength);
 
             if (originalHash != null) {
-                buf.put(data.getVersion());
+                buf.put(data.version());
                 buf.putLong(originalHash[0]);
                 buf.putLong(originalHash[1]);
             } else {
                 //Don't store hash as it's null.
-                buf.put((byte) (data.getVersion() | (byte) 0x80));
+                buf.put((byte) (data.version() | (byte) 0x80));
             }
 
             if (serializedData != null)
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
index 29470b38468..b104d420c70 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
@@ -57,7 +57,7 @@ public class SubscriptionWrapper<K> {
             this.value = value;
         }
 
-        public byte getValue() {
+        public byte value() {
             return value;
         }
 
@@ -89,23 +89,23 @@ public class SubscriptionWrapper<K> {
         this.primaryPartition = primaryPartition;
     }
 
-    public Instruction getInstruction() {
+    public Instruction instruction() {
         return instruction;
     }
 
-    public long[] getHash() {
+    public long[] hash() {
         return hash;
     }
 
-    public K getPrimaryKey() {
+    public K primaryKey() {
         return primaryKey;
     }
 
-    public byte getVersion() {
+    public byte version() {
         return version;
     }
 
-    public Integer getPrimaryPartition() {
+    public Integer primaryPartition() {
         return primaryPartition;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
index 39839af1252..ed639eba0fe 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -107,17 +107,17 @@ public class SubscriptionWrapperSerde<K> extends 
WrappingNullableSerde<Subscript
             
//{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}{4-bytes-primaryPartition}
 
             //7-bit (0x7F) maximum for data version.
-            if (Byte.compare((byte) 0x7F, data.getVersion()) < 0) {
+            if (Byte.compare((byte) 0x7F, data.version()) < 0) {
                 throw new UnsupportedVersionException("SubscriptionWrapper 
version is larger than maximum supported 0x7F");
             }
 
-            final int version = data.getVersion();
+            final int version = data.version();
             if (upgradeFromV0 || version == 0) {
                 return serializeV0(data);
             } else if (version == 1) {
                 return serializeV1(data);
             } else {
-                throw new UnsupportedVersionException("Unsupported 
SubscriptionWrapper version " + data.getVersion());
+                throw new UnsupportedVersionException("Unsupported 
SubscriptionWrapper version " + data.version());
             }
         }
 
@@ -128,7 +128,7 @@ public class SubscriptionWrapperSerde<K> extends 
WrappingNullableSerde<Subscript
 
             return  primaryKeySerializer.serialize(
                 primaryKeySerializationPseudoTopic,
-                data.getPrimaryKey()
+                data.primaryKey()
             );
         }
 
@@ -136,7 +136,7 @@ public class SubscriptionWrapperSerde<K> extends 
WrappingNullableSerde<Subscript
             final byte[] primaryKeySerializedData = serializePrimaryKey(data);
             final ByteBuffer buf;
             int dataLength = 2 + primaryKeySerializedData.length + extraLength;
-            if (data.getHash() != null) {
+            if (data.hash() != null) {
                 dataLength += 2 * Long.BYTES;
                 buf = ByteBuffer.allocate(dataLength);
                 buf.put(version);
@@ -145,9 +145,9 @@ public class SubscriptionWrapperSerde<K> extends 
WrappingNullableSerde<Subscript
                 buf = ByteBuffer.allocate(dataLength);
                 buf.put((byte) (version | (byte) 0x80));
             }
-            buf.put(data.getInstruction().getValue());
-            final long[] elem = data.getHash();
-            if (data.getHash() != null) {
+            buf.put(data.instruction().value());
+            final long[] elem = data.hash();
+            if (data.hash() != null) {
                 buf.putLong(elem[0]);
                 buf.putLong(elem[1]);
             }
@@ -160,8 +160,8 @@ public class SubscriptionWrapperSerde<K> extends 
WrappingNullableSerde<Subscript
         }
 
         private byte[] serializeV1(final SubscriptionWrapper<K> data) {
-            final ByteBuffer buf = serializeCommon(data, data.getVersion(), 
Integer.BYTES);
-            buf.putInt(data.getPrimaryPartition());
+            final ByteBuffer buf = serializeCommon(data, data.version(), 
Integer.BYTES);
+            buf.putInt(data.primaryPartition());
             return buf.array();
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
index dc6f289b136..46c0e3fa67a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
@@ -76,7 +76,7 @@ public abstract class BaseRepartitionNode<K, V> extends 
GraphNode {
         this.processorParameters = processorParameters;
     }
 
-    public ProcessorParameters<K, V, ?, ?> getProcessorParameters() {
+    public ProcessorParameters<K, V, ?, ?> processorParameters() {
         return processorParameters;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index 48fb38f5455..6cfdd53784c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -83,7 +83,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
     }
 
     public String queryableStoreName() {
-        return joinMerger().getQueryableName();
+        return joinMerger().queryableName();
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
index 241d4eb57df..f9cf9164d20 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
@@ -116,11 +116,11 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
         return isSelfJoin;
     }
 
-    public ProcessorParameters<K, V1, ?, ?> 
getThisWindowedStreamProcessorParameters() {
+    public ProcessorParameters<K, V1, ?, ?> 
thisWindowedStreamProcessorParameters() {
         return thisWindowedStreamProcessorParameters;
     }
 
-    public ProcessorParameters<K, V2, ?, ?> 
getOtherWindowedStreamProcessorParameters() {
+    public ProcessorParameters<K, V2, ?, ?> 
otherWindowedStreamProcessorParameters() {
         return otherWindowedStreamProcessorParameters;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
index 800a2a52bff..e896e3a6179 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
@@ -36,21 +36,21 @@ public abstract class BufferConfigInternal<BC extends 
Suppressed.BufferConfig<BC
             Long.MAX_VALUE,
             Long.MAX_VALUE,
             SHUT_DOWN, // doesn't matter, given the bounds
-            getLogConfig()
+            logConfig()
         );
     }
 
     @Override
     public Suppressed.StrictBufferConfig shutDownWhenFull() {
-        return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN, 
getLogConfig());
+        return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN, 
logConfig());
     }
 
     @Override
     public Suppressed.EagerBufferConfig emitEarlyWhenFull() {
-        return new EagerBufferConfigImpl(maxRecords(), maxBytes(), 
getLogConfig());
+        return new EagerBufferConfigImpl(maxRecords(), maxBytes(), 
logConfig());
     }
 
     public abstract boolean isLoggingEnabled();
 
-    public abstract Map<String, String> getLogConfig();
+    public abstract Map<String, String> logConfig();
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
index 7665e667423..e8bf8369534 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
@@ -77,7 +77,7 @@ public class EagerBufferConfigImpl extends 
BufferConfigInternal<Suppressed.Eager
     }
 
     @Override
-    public Map<String, String> getLogConfig() {
+    public Map<String, String> logConfig() {
         return isLoggingEnabled() ? logConfig : Collections.emptyMap();
     }
 
@@ -92,19 +92,19 @@ public class EagerBufferConfigImpl extends 
BufferConfigInternal<Suppressed.Eager
         final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o;
         return maxRecords == that.maxRecords &&
             maxBytes == that.maxBytes &&
-            Objects.equals(getLogConfig(), that.getLogConfig());
+            Objects.equals(logConfig(), that.logConfig());
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(maxRecords, maxBytes, getLogConfig());
+        return Objects.hash(maxRecords, maxBytes, logConfig());
     }
 
     @Override
     public String toString() {
         return "EagerBufferConfigImpl{maxRecords=" + maxRecords +
                 ", maxBytes=" + maxBytes +
-                ", logConfig=" + getLogConfig() +
+                ", logConfig=" + logConfig() +
                 "}";
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
index 2ca5ef9b4ee..34a9ce69ac5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
@@ -51,12 +51,12 @@ public class StrictBufferConfigImpl extends 
BufferConfigInternal<Suppressed.Stri
 
     @Override
     public Suppressed.StrictBufferConfig withMaxRecords(final long 
recordLimit) {
-        return new StrictBufferConfigImpl(recordLimit, maxBytes, 
bufferFullStrategy, getLogConfig());
+        return new StrictBufferConfigImpl(recordLimit, maxBytes, 
bufferFullStrategy, logConfig());
     }
 
     @Override
     public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) {
-        return new StrictBufferConfigImpl(maxRecords, byteLimit, 
bufferFullStrategy, getLogConfig());
+        return new StrictBufferConfigImpl(maxRecords, byteLimit, 
bufferFullStrategy, logConfig());
     }
 
     @Override
@@ -90,7 +90,7 @@ public class StrictBufferConfigImpl extends 
BufferConfigInternal<Suppressed.Stri
     }
 
     @Override
-    public Map<String, String> getLogConfig() {
+    public Map<String, String> logConfig() {
         return isLoggingEnabled() ? logConfig : Collections.emptyMap();
     }
 
@@ -106,12 +106,12 @@ public class StrictBufferConfigImpl extends 
BufferConfigInternal<Suppressed.Stri
         return maxRecords == that.maxRecords &&
             maxBytes == that.maxBytes &&
             bufferFullStrategy == that.bufferFullStrategy &&
-            Objects.equals(getLogConfig(), ((StrictBufferConfigImpl) 
o).getLogConfig());
+            Objects.equals(logConfig(), ((StrictBufferConfigImpl) 
o).logConfig());
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(maxRecords, maxBytes, bufferFullStrategy, 
getLogConfig());
+        return Objects.hash(maxRecords, maxBytes, bufferFullStrategy, 
logConfig());
     }
 
     @Override
@@ -119,7 +119,7 @@ public class StrictBufferConfigImpl extends 
BufferConfigInternal<Suppressed.Stri
         return "StrictBufferConfigImpl{maxKeys=" + maxRecords +
             ", maxBytes=" + maxBytes +
             ", bufferFullStrategy=" + bufferFullStrategy +
-            ", logConfig=" + getLogConfig().toString() +
+            ", logConfig=" + logConfig().toString() +
              '}';
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
index 22731317e3e..3142f65febf 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
@@ -38,8 +38,8 @@ public class CombinedKeySchemaTest {
         final Bytes result = cks.toBytes("foreignKey", primary);
 
         final CombinedKey<String, Integer> deserializedKey = 
cks.fromBytes(result);
-        assertEquals("foreignKey", deserializedKey.getForeignKey());
-        assertEquals(primary, deserializedKey.getPrimaryKey());
+        assertEquals("foreignKey", deserializedKey.foreignKey());
+        assertEquals(primary, deserializedKey.primaryKey());
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
index cee96e00034..15a9f602640 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
@@ -77,7 +77,7 @@ public class SubscriptionJoinProcessorSupplierTest {
             new Record<>(
                 "pk1",
                 new SubscriptionResponseWrapper<>(
-                    newValue.getHash(),
+                    newValue.hash(),
                     null,
                     null),
                 1L
@@ -111,7 +111,7 @@ public class SubscriptionJoinProcessorSupplierTest {
             new Record<>(
                 "pk1",
                 new SubscriptionResponseWrapper<>(
-                    newValue.getHash(),
+                    newValue.hash(),
                     null,
                     12
                 ),
@@ -146,7 +146,7 @@ public class SubscriptionJoinProcessorSupplierTest {
             new Record<>(
                 "pk1",
                 new SubscriptionResponseWrapper<>(
-                    newValue.getHash(),
+                    newValue.hash(),
                     "foo",
                     null
                 ),
@@ -181,7 +181,7 @@ public class SubscriptionJoinProcessorSupplierTest {
             new Record<>(
                 "pk1",
                 new SubscriptionResponseWrapper<>(
-                    newValue.getHash(),
+                    newValue.hash(),
                     "foo",
                      12
                 ),
@@ -216,7 +216,7 @@ public class SubscriptionJoinProcessorSupplierTest {
             new Record<>(
                 "pk1",
                 new SubscriptionResponseWrapper<>(
-                    newValue.getHash(),
+                    newValue.hash(),
                     "foo",
                     null
                 ),
@@ -237,7 +237,7 @@ public class SubscriptionJoinProcessorSupplierTest {
             new Record<>(
                 "pk1",
                 new SubscriptionResponseWrapper<>(
-                    newValue.getHash(),
+                    newValue.hash(),
                     null,
                     null
                 ),
@@ -270,7 +270,7 @@ public class SubscriptionJoinProcessorSupplierTest {
             new Record<>(
                 "pk1",
                 new SubscriptionResponseWrapper<>(
-                    newValue.getHash(),
+                    newValue.hash(),
                     "foo",
                     12
                 ),
@@ -291,7 +291,7 @@ public class SubscriptionJoinProcessorSupplierTest {
             new Record<>(
                 "pk1",
                 new SubscriptionResponseWrapper<>(
-                    newValue.getHash(),
+                    newValue.hash(),
                     null,
                     12
                 ),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
index e4b35685c2f..9be5e39cc15 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
@@ -82,9 +82,9 @@ public class SubscriptionResponseWrapperSerdeTest {
         final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
         final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
 
-        assertArrayEquals(hashedValue, result.getOriginalValueHash());
-        assertEquals(foreignValue, result.getForeignValue());
-        assertNull(result.getPrimaryPartition());
+        assertArrayEquals(hashedValue, result.originalValueHash());
+        assertEquals(foreignValue, result.foreignValue());
+        assertNull(result.primaryPartition());
     }
 
     @Test
@@ -96,9 +96,9 @@ public class SubscriptionResponseWrapperSerdeTest {
         final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
         final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
 
-        assertArrayEquals(hashedValue, result.getOriginalValueHash());
-        assertNull(result.getForeignValue());
-        assertNull(result.getPrimaryPartition());
+        assertArrayEquals(hashedValue, result.originalValueHash());
+        assertNull(result.foreignValue());
+        assertNull(result.primaryPartition());
     }
 
     @Test
@@ -111,9 +111,9 @@ public class SubscriptionResponseWrapperSerdeTest {
         final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
         final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
 
-        assertArrayEquals(hashedValue, result.getOriginalValueHash());
-        assertEquals(foreignValue, result.getForeignValue());
-        assertNull(result.getPrimaryPartition());
+        assertArrayEquals(hashedValue, result.originalValueHash());
+        assertEquals(foreignValue, result.foreignValue());
+        assertNull(result.primaryPartition());
     }
 
     @Test
@@ -126,9 +126,9 @@ public class SubscriptionResponseWrapperSerdeTest {
         final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
         final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
 
-        assertArrayEquals(hashedValue, result.getOriginalValueHash());
-        assertEquals(foreignValue, result.getForeignValue());
-        assertNull(result.getPrimaryPartition());
+        assertArrayEquals(hashedValue, result.originalValueHash());
+        assertEquals(foreignValue, result.foreignValue());
+        assertNull(result.primaryPartition());
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
index b64d497fd61..db2842401b5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
@@ -51,11 +51,11 @@ public class SubscriptionWrapperSerdeTest {
         final SubscriptionWrapper deserialized = (SubscriptionWrapper) 
swSerde.deserializer()
             .deserialize(null, serialized);
 
-        assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, 
deserialized.getInstruction());
-        assertArrayEquals(hashedValue, deserialized.getHash());
-        assertEquals(originalKey, deserialized.getPrimaryKey());
-        assertEquals(primaryPartition, deserialized.getPrimaryPartition());
-        assertEquals(version, deserialized.getVersion());
+        assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, 
deserialized.instruction());
+        assertArrayEquals(hashedValue, deserialized.hash());
+        assertEquals(originalKey, deserialized.primaryKey());
+        assertEquals(primaryPartition, deserialized.primaryPartition());
+        assertEquals(version, deserialized.version());
     }
 
     @Test
@@ -76,11 +76,11 @@ public class SubscriptionWrapperSerdeTest {
         final SubscriptionWrapper deserialized = (SubscriptionWrapper) 
swSerde.deserializer()
             .deserialize(null, serialized);
 
-        assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, 
deserialized.getInstruction());
-        assertArrayEquals(hashedValue, deserialized.getHash());
-        assertEquals(originalKey, deserialized.getPrimaryKey());
-        assertEquals(primaryPartition, deserialized.getPrimaryPartition());
-        assertEquals(version, deserialized.getVersion());
+        assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, 
deserialized.instruction());
+        assertArrayEquals(hashedValue, deserialized.hash());
+        assertEquals(originalKey, deserialized.primaryKey());
+        assertEquals(primaryPartition, deserialized.primaryPartition());
+        assertEquals(version, deserialized.version());
     }
 
     @Test
@@ -104,11 +104,11 @@ public class SubscriptionWrapperSerdeTest {
         final SubscriptionWrapper deserialized = (SubscriptionWrapper) 
swSerde.deserializer()
             .deserialize(null, serialized);
 
-        assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, 
deserialized.getInstruction());
-        assertArrayEquals(hashedValue, deserialized.getHash());
-        assertEquals(originalKey, deserialized.getPrimaryKey());
-        assertEquals(0, deserialized.getVersion());
-        assertNull(deserialized.getPrimaryPartition());
+        assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, 
deserialized.instruction());
+        assertArrayEquals(hashedValue, deserialized.hash());
+        assertEquals(originalKey, deserialized.primaryKey());
+        assertEquals(0, deserialized.version());
+        assertNull(deserialized.primaryPartition());
     }
 
     @Test
@@ -128,11 +128,11 @@ public class SubscriptionWrapperSerdeTest {
         final byte[] serialized = swSerde.serializer().serialize(null, 
wrapper);
         final SubscriptionWrapper deserialized = (SubscriptionWrapper) 
swSerde.deserializer().deserialize(null, serialized);
 
-        
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
 deserialized.getInstruction());
-        assertArrayEquals(hashedValue, deserialized.getHash());
-        assertEquals(originalKey, deserialized.getPrimaryKey());
-        assertEquals(primaryPartition, deserialized.getPrimaryPartition());
-        assertEquals(version, deserialized.getVersion());
+        
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
 deserialized.instruction());
+        assertArrayEquals(hashedValue, deserialized.hash());
+        assertEquals(originalKey, deserialized.primaryKey());
+        assertEquals(primaryPartition, deserialized.primaryPartition());
+        assertEquals(version, deserialized.version());
     }
 
     @Test
@@ -153,11 +153,11 @@ public class SubscriptionWrapperSerdeTest {
         final SubscriptionWrapper deserialized = (SubscriptionWrapper) 
swSerde.deserializer()
             .deserialize(null, serialized);
 
-        
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
 deserialized.getInstruction());
-        assertArrayEquals(hashedValue, deserialized.getHash());
-        assertEquals(originalKey, deserialized.getPrimaryKey());
-        assertEquals(primaryPartition, deserialized.getPrimaryPartition());
-        assertEquals(version, deserialized.getVersion());
+        
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
 deserialized.instruction());
+        assertArrayEquals(hashedValue, deserialized.hash());
+        assertEquals(originalKey, deserialized.primaryKey());
+        assertEquals(primaryPartition, deserialized.primaryPartition());
+        assertEquals(version, deserialized.version());
     }
 
     @Test
@@ -176,11 +176,11 @@ public class SubscriptionWrapperSerdeTest {
         final byte[] serialized = swSerde.serializer().serialize(null, 
wrapper);
         final SubscriptionWrapper deserialized = (SubscriptionWrapper) 
swSerde.deserializer().deserialize(null, serialized);
 
-        
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
 deserialized.getInstruction());
-        assertArrayEquals(hashedValue, deserialized.getHash());
-        assertEquals(originalKey, deserialized.getPrimaryKey());
-        assertEquals(primaryPartition, deserialized.getPrimaryPartition());
-        assertEquals(version, deserialized.getVersion());
+        
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
 deserialized.instruction());
+        assertArrayEquals(hashedValue, deserialized.hash());
+        assertEquals(originalKey, deserialized.primaryKey());
+        assertEquals(primaryPartition, deserialized.primaryPartition());
+        assertEquals(version, deserialized.version());
     }
 
     @Test

Reply via email to