This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1f5aea2a864 MINOR: remove get prefix for internal DSL methods (#17050)
1f5aea2a864 is described below
commit 1f5aea2a864e873dd4c0207d8039fe3fd24c66a5
Author: Matthias J. Sax <[email protected]>
AuthorDate: Sun Sep 1 02:14:51 2024 -0700
MINOR: remove get prefix for internal DSL methods (#17050)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kstream/internals/InternalStreamsBuilder.java | 8 +--
.../kstream/internals/KStreamKStreamJoin.java | 8 +--
.../internals/KStreamKStreamJoinLeftSide.java | 5 +-
.../internals/KStreamKStreamJoinRightSide.java | 5 +-
.../streams/kstream/internals/KTableImpl.java | 4 +-
.../kstream/internals/KTableKTableJoinMerger.java | 2 +-
.../internals/foreignkeyjoin/CombinedKey.java | 4 +-
.../ForeignTableJoinProcessorSupplier.java | 6 +--
.../ResponseJoinProcessorSupplier.java | 8 +--
.../SubscriptionJoinProcessorSupplier.java | 28 +++++-----
.../SubscriptionReceiveProcessorSupplier.java | 12 ++---
.../SubscriptionResponseWrapper.java | 8 +--
.../SubscriptionResponseWrapperSerde.java | 10 ++--
.../foreignkeyjoin/SubscriptionWrapper.java | 12 ++---
.../foreignkeyjoin/SubscriptionWrapperSerde.java | 20 ++++----
.../internals/graph/BaseRepartitionNode.java | 2 +-
.../internals/graph/KTableKTableJoinNode.java | 2 +-
.../internals/graph/StreamStreamJoinNode.java | 4 +-
.../internals/suppress/BufferConfigInternal.java | 8 +--
.../internals/suppress/EagerBufferConfigImpl.java | 8 +--
.../internals/suppress/StrictBufferConfigImpl.java | 12 ++---
.../foreignkeyjoin/CombinedKeySchemaTest.java | 4 +-
.../SubscriptionJoinProcessorSupplierTest.java | 16 +++---
.../SubscriptionResponseWrapperSerdeTest.java | 24 ++++-----
.../SubscriptionWrapperSerdeTest.java | 60 +++++++++++-----------
25 files changed, 139 insertions(+), 141 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 13a8d64b66d..92dde06e9c0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -364,10 +364,10 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
private void rewriteRepartitionNodes() {
final Set<BaseRepartitionNode<?, ?>> nodes = new
NodesWithRelaxedNullKeyJoinDownstream(root).find();
for (final BaseRepartitionNode<?, ?> partitionNode : nodes) {
- if (partitionNode.getProcessorParameters() != null) {
+ if (partitionNode.processorParameters() != null) {
partitionNode.setProcessorParameters(new ProcessorParameters<>(
new KStreamFilter<>((k, v) -> k != null, false),
- partitionNode.getProcessorParameters().processorName()
+ partitionNode.processorParameters().processorName()
));
}
}
@@ -445,9 +445,9 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
GraphNode left = null, right = null;
for (final GraphNode child: parent.children()) {
if (child instanceof WindowedStreamProcessorNode &&
child.buildPriority() < joinNode.buildPriority()) {
- if
(child.nodeName().equals(joinNode.getThisWindowedStreamProcessorParameters().processorName()))
{
+ if
(child.nodeName().equals(joinNode.thisWindowedStreamProcessorParameters().processorName()))
{
left = child;
- } else if
(child.nodeName().equals(joinNode.getOtherWindowedStreamProcessorParameters().processorName()))
{
+ } else if
(child.nodeName().equals(joinNode.otherWindowedStreamProcessorParameters().processorName()))
{
right = child;
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 9ba8f316271..d3c0c4c0a22 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -170,9 +170,9 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut,
VThis, VOther> impleme
protected abstract TimestampedKeyAndJoinSide<K> makeOtherKey(final K
key, final long timestamp);
- protected abstract VThis getThisValue(final LeftOrRightValue<? extends
VLeft, ? extends VRight> leftOrRightValue);
+ protected abstract VThis thisValue(final LeftOrRightValue<? extends
VLeft, ? extends VRight> leftOrRightValue);
- protected abstract VOther getOtherValue(final LeftOrRightValue<?
extends VLeft, ? extends VRight> leftOrRightValue);
+ protected abstract VOther otherValue(final LeftOrRightValue<? extends
VLeft, ? extends VRight> leftOrRightValue);
private void emitNonJoinedOuterRecords(final
KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>>
store,
final Record<K, VThis> record) {
@@ -252,8 +252,8 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut,
VThis, VOther> impleme
final
LeftOrRightValue<VLeft, VRight> leftOrRightValue) {
final K key = timestampedKeyAndJoinSide.key();
final long timestamp = timestampedKeyAndJoinSide.timestamp();
- final VThis thisValue = getThisValue(leftOrRightValue);
- final VOther otherValue = getOtherValue(leftOrRightValue);
+ final VThis thisValue = thisValue(leftOrRightValue);
+ final VOther otherValue = otherValue(leftOrRightValue);
final VOut nullJoinedValue = joiner.apply(key, thisValue,
otherValue);
context().forward(
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
index df03e39b8c5..1a1fbdffb2d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
@@ -58,13 +58,12 @@ class KStreamKStreamJoinLeftSide<K, VLeft, VRight, VOut>
extends KStreamKStreamJ
return TimestampedKeyAndJoinSide.makeRight(key, timestamp);
}
- @Override
- public VLeft getThisValue(final LeftOrRightValue<? extends VLeft, ?
extends VRight> leftOrRightValue) {
+ public VLeft thisValue(final LeftOrRightValue<? extends VLeft, ?
extends VRight> leftOrRightValue) {
return leftOrRightValue.leftValue();
}
@Override
- public VRight getOtherValue(final LeftOrRightValue<? extends VLeft, ?
extends VRight> leftOrRightValue) {
+ public VRight otherValue(final LeftOrRightValue<? extends VLeft, ?
extends VRight> leftOrRightValue) {
return leftOrRightValue.rightValue();
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
index ec29d5f12b8..d09593f3b28 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
@@ -57,13 +57,12 @@ class KStreamKStreamJoinRightSide<K, VLeft, VRight, VOut>
extends KStreamKStream
return TimestampedKeyAndJoinSide.makeLeft(key, timestamp);
}
- @Override
- public VRight getThisValue(final LeftOrRightValue<? extends VLeft, ?
extends VRight> leftOrRightValue) {
+ public VRight thisValue(final LeftOrRightValue<? extends VLeft, ?
extends VRight> leftOrRightValue) {
return leftOrRightValue.rightValue();
}
@Override
- public VLeft getOtherValue(final LeftOrRightValue<? extends VLeft, ?
extends VRight> leftOrRightValue) {
+ public VLeft otherValue(final LeftOrRightValue<? extends VLeft, ?
extends VRight> leftOrRightValue) {
return leftOrRightValue.leftValue();
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index f06059f7c0a..09efdb78006 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -573,7 +573,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K,
V> implements KTable<
final StoreBuilder<InMemoryTimeOrderedKeyValueChangeBuffer<K, V,
Change<V>>> storeBuilder;
if (suppressedInternal.bufferConfig().isLoggingEnabled()) {
- final Map<String, String> topicConfig =
suppressedInternal.bufferConfig().getLogConfig();
+ final Map<String, String> topicConfig =
suppressedInternal.bufferConfig().logConfig();
storeBuilder = new
InMemoryTimeOrderedKeyValueChangeBuffer.Builder<>(
storeName,
keySerde,
@@ -1165,7 +1165,7 @@ public class KTableImpl<K, S, V> extends
AbstractStream<K, V> implements KTable<
final StreamPartitioner<K, SubscriptionResponseWrapper<VO>>
defaultForeignResponseSinkPartitioner =
(topic, key, subscriptionResponseWrapper, numPartitions) -> {
- final Integer partition =
subscriptionResponseWrapper.getPrimaryPartition();
+ final Integer partition =
subscriptionResponseWrapper.primaryPartition();
return partition == null ? Optional.empty() :
Optional.of(Collections.singleton(partition));
};
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 9d381635586..7924f8ea857 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -44,7 +44,7 @@ public class KTableKTableJoinMerger<K, V> implements
KTableProcessorSupplier<K,
this.queryableName = queryableName;
}
- public String getQueryableName() {
+ public String queryableName() {
return queryableName;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
index 01e94da345a..016f24cb4a0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
@@ -29,11 +29,11 @@ public class CombinedKey<KF, KP> {
this.primaryKey = primaryKey;
}
- public KF getForeignKey() {
+ public KF foreignKey() {
return foreignKey;
}
- public KP getPrimaryKey() {
+ public KP primaryKey() {
return primaryKey;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
index cd7f13f81e2..7c3e982a8ed 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
@@ -123,11 +123,11 @@ public class ForeignTableJoinProcessorSupplier<K, KO, VO>
implements
if (prefixEquals(next.key.get(), prefixBytes.get())) {
final CombinedKey<KO, K> combinedKey =
keySchema.fromBytes(next.key);
context().forward(
- record.withKey(combinedKey.getPrimaryKey())
+ record.withKey(combinedKey.primaryKey())
.withValue(new SubscriptionResponseWrapper<>(
- next.value.value().getHash(),
+ next.value.value().hash(),
record.value().newValue,
- next.value.value().getPrimaryPartition()))
+ next.value.value().primaryPartition()))
);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
index 6a55d3e85f6..50c2b356344 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
@@ -99,7 +99,7 @@ public class ResponseJoinProcessorSupplier<K, V, VO, VR>
implements ProcessorSup
@Override
public void process(final Record<K,
SubscriptionResponseWrapper<VO>> record) {
- if (record.value().getVersion() !=
SubscriptionResponseWrapper.CURRENT_VERSION) {
+ if (record.value().version() !=
SubscriptionResponseWrapper.CURRENT_VERSION) {
//Guard against modifications to
SubscriptionResponseWrapper. Need to ensure that there is
//compatibility with previous versions to enable rolling
upgrades. Must develop a strategy for
//upgrading from older SubscriptionWrapper versions to
newer versions.
@@ -111,16 +111,16 @@ public class ResponseJoinProcessorSupplier<K, V, VO, VR>
implements ProcessorSup
null :
Murmur3.hash128(runtimeValueSerializer.serialize(valueHashSerdePseudoTopic,
currentValueWithTimestamp.value()));
- final long[] messageHash =
record.value().getOriginalValueHash();
+ final long[] messageHash = record.value().originalValueHash();
//If this value doesn't match the current value from the
original table, it is stale and should be discarded.
if (java.util.Arrays.equals(messageHash, currentHash)) {
final VR result;
- if (record.value().getForeignValue() == null && (!leftJoin
|| currentValueWithTimestamp == null)) {
+ if (record.value().foreignValue() == null && (!leftJoin ||
currentValueWithTimestamp == null)) {
result = null; //Emit tombstone
} else {
- result = joiner.apply(currentValueWithTimestamp ==
null ? null : currentValueWithTimestamp.value(),
record.value().getForeignValue());
+ result = joiner.apply(currentValueWithTimestamp ==
null ? null : currentValueWithTimestamp.value(), record.value().foreignValue());
}
context().forward(record.withValue(result));
} else {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
index 388b669e988..008ef665817 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
@@ -70,7 +70,7 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
Objects.requireNonNull(valueAndTimestamp, "This processor
should never see a null newValue.");
final SubscriptionWrapper<K> value = valueAndTimestamp.value();
- if (value.getVersion() > SubscriptionWrapper.CURRENT_VERSION) {
+ if (value.version() > SubscriptionWrapper.CURRENT_VERSION) {
//Guard against modifications to SubscriptionWrapper. Need
to ensure that there is compatibility
//with previous versions to enable rolling upgrades. Must
develop a strategy for upgrading
//from older SubscriptionWrapper versions to newer
versions.
@@ -78,23 +78,23 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
}
final ValueAndTimestamp<VO> foreignValueAndTime =
- record.key().getForeignKey() == null ?
+ record.key().foreignKey() == null ?
null :
- foreignValues.get(record.key().getForeignKey());
+ foreignValues.get(record.key().foreignKey());
final long resultTimestamp =
foreignValueAndTime == null ?
valueAndTimestamp.timestamp() :
Math.max(valueAndTimestamp.timestamp(),
foreignValueAndTime.timestamp());
- switch (value.getInstruction()) {
+ switch (value.instruction()) {
case DELETE_KEY_AND_PROPAGATE:
context().forward(
- record.withKey(record.key().getPrimaryKey())
+ record.withKey(record.key().primaryKey())
.withValue(new SubscriptionResponseWrapper<VO>(
- value.getHash(),
+ value.hash(),
null,
- value.getPrimaryPartition()
+ value.primaryPartition()
))
.withTimestamp(resultTimestamp)
);
@@ -106,11 +106,11 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
final VO valueToSend = foreignValueAndTime == null ?
null : foreignValueAndTime.value();
context().forward(
- record.withKey(record.key().getPrimaryKey())
+ record.withKey(record.key().primaryKey())
.withValue(new SubscriptionResponseWrapper<>(
- value.getHash(),
+ value.hash(),
valueToSend,
- value.getPrimaryPartition()
+ value.primaryPartition()
))
.withTimestamp(resultTimestamp)
);
@@ -118,11 +118,11 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
case PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE:
if (foreignValueAndTime != null) {
context().forward(
- record.withKey(record.key().getPrimaryKey())
+ record.withKey(record.key().primaryKey())
.withValue(new
SubscriptionResponseWrapper<>(
- value.getHash(),
+ value.hash(),
foreignValueAndTime.value(),
- value.getPrimaryPartition()
+ value.primaryPartition()
))
.withTimestamp(resultTimestamp)
);
@@ -131,7 +131,7 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
case DELETE_KEY_NO_PROPAGATE:
break;
default:
- throw new IllegalStateException("Unhandled
instruction: " + value.getInstruction());
+ throw new IllegalStateException("Unhandled
instruction: " + value.instruction());
}
}
};
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
index a00ed777c85..a935797ad18 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
@@ -75,18 +75,18 @@ public class SubscriptionReceiveProcessorSupplier<K, KO>
@Override
public void process(final Record<KO, SubscriptionWrapper<K>>
record) {
- if (record.key() == null &&
!SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().getInstruction()))
{
+ if (record.key() == null &&
!SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction()))
{
dropRecord();
return;
}
- if (record.value().getVersion() >
SubscriptionWrapper.CURRENT_VERSION) {
+ if (record.value().version() >
SubscriptionWrapper.CURRENT_VERSION) {
//Guard against modifications to SubscriptionWrapper. Need
to ensure that there is compatibility
//with previous versions to enable rolling upgrades. Must
develop a strategy for upgrading
//from older SubscriptionWrapper versions to newer
versions.
throw new UnsupportedVersionException("SubscriptionWrapper
is of an incompatible version.");
}
context().forward(
- record.withKey(new CombinedKey<>(record.key(),
record.value().getPrimaryKey()))
+ record.withKey(new CombinedKey<>(record.key(),
record.value().primaryKey()))
.withValue(inferChange(record))
.withTimestamp(record.timestamp())
);
@@ -101,14 +101,14 @@ public class SubscriptionReceiveProcessorSupplier<K, KO>
}
private Change<ValueAndTimestamp<SubscriptionWrapper<K>>>
inferBasedOnState(final Record<KO, SubscriptionWrapper<K>> record) {
- final Bytes subscriptionKey = keySchema.toBytes(record.key(),
record.value().getPrimaryKey());
+ final Bytes subscriptionKey = keySchema.toBytes(record.key(),
record.value().primaryKey());
final ValueAndTimestamp<SubscriptionWrapper<K>> newValue =
ValueAndTimestamp.make(record.value(), record.timestamp());
final ValueAndTimestamp<SubscriptionWrapper<K>> oldValue =
store.get(subscriptionKey);
//This store is used by the prefix scanner in
ForeignTableJoinProcessorSupplier
- if
(record.value().getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE)
||
-
record.value().getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE))
{
+ if
(record.value().instruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE)
||
+
record.value().instruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE))
{
store.delete(subscriptionKey);
} else {
store.put(subscriptionKey, newValue);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
index f19bf680d86..501edcd2432 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
@@ -48,19 +48,19 @@ public class SubscriptionResponseWrapper<FV> {
this.primaryPartition = primaryPartition;
}
- public long[] getOriginalValueHash() {
+ public long[] originalValueHash() {
return originalValueHash;
}
- public FV getForeignValue() {
+ public FV foreignValue() {
return foreignValue;
}
- public byte getVersion() {
+ public byte version() {
return version;
}
- public Integer getPrimaryPartition() {
+ public Integer primaryPartition() {
return primaryPartition;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
index 12a14e7cc4d..9e143d4e296 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
@@ -67,24 +67,24 @@ public class SubscriptionResponseWrapperSerde<V> implements
Serde<SubscriptionRe
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized
data}
//7-bit (0x7F) maximum for data version.
- if (Byte.compare((byte) 0x7F, data.getVersion()) < 0) {
+ if (Byte.compare((byte) 0x7F, data.version()) < 0) {
throw new
UnsupportedVersionException("SubscriptionResponseWrapper version is larger than
maximum supported 0x7F");
}
- final byte[] serializedData = data.getForeignValue() == null ?
null : serializer.serialize(topic, data.getForeignValue());
+ final byte[] serializedData = data.foreignValue() == null ? null :
serializer.serialize(topic, data.foreignValue());
final int serializedDataLength = serializedData == null ? 0 :
serializedData.length;
- final long[] originalHash = data.getOriginalValueHash();
+ final long[] originalHash = data.originalValueHash();
final int hashLength = originalHash == null ? 0 : 2 * Long.BYTES;
final ByteBuffer buf = ByteBuffer.allocate(1 + hashLength +
serializedDataLength);
if (originalHash != null) {
- buf.put(data.getVersion());
+ buf.put(data.version());
buf.putLong(originalHash[0]);
buf.putLong(originalHash[1]);
} else {
//Don't store hash as it's null.
- buf.put((byte) (data.getVersion() | (byte) 0x80));
+ buf.put((byte) (data.version() | (byte) 0x80));
}
if (serializedData != null)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
index 29470b38468..b104d420c70 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
@@ -57,7 +57,7 @@ public class SubscriptionWrapper<K> {
this.value = value;
}
- public byte getValue() {
+ public byte value() {
return value;
}
@@ -89,23 +89,23 @@ public class SubscriptionWrapper<K> {
this.primaryPartition = primaryPartition;
}
- public Instruction getInstruction() {
+ public Instruction instruction() {
return instruction;
}
- public long[] getHash() {
+ public long[] hash() {
return hash;
}
- public K getPrimaryKey() {
+ public K primaryKey() {
return primaryKey;
}
- public byte getVersion() {
+ public byte version() {
return version;
}
- public Integer getPrimaryPartition() {
+ public Integer primaryPartition() {
return primaryPartition;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
index 39839af1252..ed639eba0fe 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -107,17 +107,17 @@ public class SubscriptionWrapperSerde<K> extends
WrappingNullableSerde<Subscript
//{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}{4-bytes-primaryPartition}
//7-bit (0x7F) maximum for data version.
- if (Byte.compare((byte) 0x7F, data.getVersion()) < 0) {
+ if (Byte.compare((byte) 0x7F, data.version()) < 0) {
throw new UnsupportedVersionException("SubscriptionWrapper
version is larger than maximum supported 0x7F");
}
- final int version = data.getVersion();
+ final int version = data.version();
if (upgradeFromV0 || version == 0) {
return serializeV0(data);
} else if (version == 1) {
return serializeV1(data);
} else {
- throw new UnsupportedVersionException("Unsupported
SubscriptionWrapper version " + data.getVersion());
+ throw new UnsupportedVersionException("Unsupported
SubscriptionWrapper version " + data.version());
}
}
@@ -128,7 +128,7 @@ public class SubscriptionWrapperSerde<K> extends
WrappingNullableSerde<Subscript
return primaryKeySerializer.serialize(
primaryKeySerializationPseudoTopic,
- data.getPrimaryKey()
+ data.primaryKey()
);
}
@@ -136,7 +136,7 @@ public class SubscriptionWrapperSerde<K> extends
WrappingNullableSerde<Subscript
final byte[] primaryKeySerializedData = serializePrimaryKey(data);
final ByteBuffer buf;
int dataLength = 2 + primaryKeySerializedData.length + extraLength;
- if (data.getHash() != null) {
+ if (data.hash() != null) {
dataLength += 2 * Long.BYTES;
buf = ByteBuffer.allocate(dataLength);
buf.put(version);
@@ -145,9 +145,9 @@ public class SubscriptionWrapperSerde<K> extends
WrappingNullableSerde<Subscript
buf = ByteBuffer.allocate(dataLength);
buf.put((byte) (version | (byte) 0x80));
}
- buf.put(data.getInstruction().getValue());
- final long[] elem = data.getHash();
- if (data.getHash() != null) {
+ buf.put(data.instruction().value());
+ final long[] elem = data.hash();
+ if (data.hash() != null) {
buf.putLong(elem[0]);
buf.putLong(elem[1]);
}
@@ -160,8 +160,8 @@ public class SubscriptionWrapperSerde<K> extends
WrappingNullableSerde<Subscript
}
private byte[] serializeV1(final SubscriptionWrapper<K> data) {
- final ByteBuffer buf = serializeCommon(data, data.getVersion(),
Integer.BYTES);
- buf.putInt(data.getPrimaryPartition());
+ final ByteBuffer buf = serializeCommon(data, data.version(),
Integer.BYTES);
+ buf.putInt(data.primaryPartition());
return buf.array();
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
index dc6f289b136..46c0e3fa67a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
@@ -76,7 +76,7 @@ public abstract class BaseRepartitionNode<K, V> extends
GraphNode {
this.processorParameters = processorParameters;
}
- public ProcessorParameters<K, V, ?, ?> getProcessorParameters() {
+ public ProcessorParameters<K, V, ?, ?> processorParameters() {
return processorParameters;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index 48fb38f5455..6cfdd53784c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -83,7 +83,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
}
public String queryableStoreName() {
- return joinMerger().getQueryableName();
+ return joinMerger().queryableName();
}
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
index 241d4eb57df..f9cf9164d20 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
@@ -116,11 +116,11 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
return isSelfJoin;
}
- public ProcessorParameters<K, V1, ?, ?>
getThisWindowedStreamProcessorParameters() {
+ public ProcessorParameters<K, V1, ?, ?>
thisWindowedStreamProcessorParameters() {
return thisWindowedStreamProcessorParameters;
}
- public ProcessorParameters<K, V2, ?, ?>
getOtherWindowedStreamProcessorParameters() {
+ public ProcessorParameters<K, V2, ?, ?>
otherWindowedStreamProcessorParameters() {
return otherWindowedStreamProcessorParameters;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
index 800a2a52bff..e896e3a6179 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
@@ -36,21 +36,21 @@ public abstract class BufferConfigInternal<BC extends
Suppressed.BufferConfig<BC
Long.MAX_VALUE,
Long.MAX_VALUE,
SHUT_DOWN, // doesn't matter, given the bounds
- getLogConfig()
+ logConfig()
);
}
@Override
public Suppressed.StrictBufferConfig shutDownWhenFull() {
- return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN,
getLogConfig());
+ return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN,
logConfig());
}
@Override
public Suppressed.EagerBufferConfig emitEarlyWhenFull() {
- return new EagerBufferConfigImpl(maxRecords(), maxBytes(),
getLogConfig());
+ return new EagerBufferConfigImpl(maxRecords(), maxBytes(),
logConfig());
}
public abstract boolean isLoggingEnabled();
- public abstract Map<String, String> getLogConfig();
+ public abstract Map<String, String> logConfig();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
index 7665e667423..e8bf8369534 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
@@ -77,7 +77,7 @@ public class EagerBufferConfigImpl extends
BufferConfigInternal<Suppressed.Eager
}
@Override
- public Map<String, String> getLogConfig() {
+ public Map<String, String> logConfig() {
return isLoggingEnabled() ? logConfig : Collections.emptyMap();
}
@@ -92,19 +92,19 @@ public class EagerBufferConfigImpl extends
BufferConfigInternal<Suppressed.Eager
final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o;
return maxRecords == that.maxRecords &&
maxBytes == that.maxBytes &&
- Objects.equals(getLogConfig(), that.getLogConfig());
+ Objects.equals(logConfig(), that.logConfig());
}
@Override
public int hashCode() {
- return Objects.hash(maxRecords, maxBytes, getLogConfig());
+ return Objects.hash(maxRecords, maxBytes, logConfig());
}
@Override
public String toString() {
return "EagerBufferConfigImpl{maxRecords=" + maxRecords +
", maxBytes=" + maxBytes +
- ", logConfig=" + getLogConfig() +
+ ", logConfig=" + logConfig() +
"}";
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
index 2ca5ef9b4ee..34a9ce69ac5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
@@ -51,12 +51,12 @@ public class StrictBufferConfigImpl extends
BufferConfigInternal<Suppressed.Stri
@Override
public Suppressed.StrictBufferConfig withMaxRecords(final long
recordLimit) {
- return new StrictBufferConfigImpl(recordLimit, maxBytes,
bufferFullStrategy, getLogConfig());
+ return new StrictBufferConfigImpl(recordLimit, maxBytes,
bufferFullStrategy, logConfig());
}
@Override
public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) {
- return new StrictBufferConfigImpl(maxRecords, byteLimit,
bufferFullStrategy, getLogConfig());
+ return new StrictBufferConfigImpl(maxRecords, byteLimit,
bufferFullStrategy, logConfig());
}
@Override
@@ -90,7 +90,7 @@ public class StrictBufferConfigImpl extends
BufferConfigInternal<Suppressed.Stri
}
@Override
- public Map<String, String> getLogConfig() {
+ public Map<String, String> logConfig() {
return isLoggingEnabled() ? logConfig : Collections.emptyMap();
}
@@ -106,12 +106,12 @@ public class StrictBufferConfigImpl extends
BufferConfigInternal<Suppressed.Stri
return maxRecords == that.maxRecords &&
maxBytes == that.maxBytes &&
bufferFullStrategy == that.bufferFullStrategy &&
- Objects.equals(getLogConfig(), ((StrictBufferConfigImpl)
o).getLogConfig());
+ Objects.equals(logConfig(), ((StrictBufferConfigImpl)
o).logConfig());
}
@Override
public int hashCode() {
- return Objects.hash(maxRecords, maxBytes, bufferFullStrategy,
getLogConfig());
+ return Objects.hash(maxRecords, maxBytes, bufferFullStrategy,
logConfig());
}
@Override
@@ -119,7 +119,7 @@ public class StrictBufferConfigImpl extends
BufferConfigInternal<Suppressed.Stri
return "StrictBufferConfigImpl{maxKeys=" + maxRecords +
", maxBytes=" + maxBytes +
", bufferFullStrategy=" + bufferFullStrategy +
- ", logConfig=" + getLogConfig().toString() +
+ ", logConfig=" + logConfig().toString() +
'}';
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
index 22731317e3e..3142f65febf 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
@@ -38,8 +38,8 @@ public class CombinedKeySchemaTest {
final Bytes result = cks.toBytes("foreignKey", primary);
final CombinedKey<String, Integer> deserializedKey =
cks.fromBytes(result);
- assertEquals("foreignKey", deserializedKey.getForeignKey());
- assertEquals(primary, deserializedKey.getPrimaryKey());
+ assertEquals("foreignKey", deserializedKey.foreignKey());
+ assertEquals(primary, deserializedKey.primaryKey());
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
index cee96e00034..15a9f602640 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
@@ -77,7 +77,7 @@ public class SubscriptionJoinProcessorSupplierTest {
new Record<>(
"pk1",
new SubscriptionResponseWrapper<>(
- newValue.getHash(),
+ newValue.hash(),
null,
null),
1L
@@ -111,7 +111,7 @@ public class SubscriptionJoinProcessorSupplierTest {
new Record<>(
"pk1",
new SubscriptionResponseWrapper<>(
- newValue.getHash(),
+ newValue.hash(),
null,
12
),
@@ -146,7 +146,7 @@ public class SubscriptionJoinProcessorSupplierTest {
new Record<>(
"pk1",
new SubscriptionResponseWrapper<>(
- newValue.getHash(),
+ newValue.hash(),
"foo",
null
),
@@ -181,7 +181,7 @@ public class SubscriptionJoinProcessorSupplierTest {
new Record<>(
"pk1",
new SubscriptionResponseWrapper<>(
- newValue.getHash(),
+ newValue.hash(),
"foo",
12
),
@@ -216,7 +216,7 @@ public class SubscriptionJoinProcessorSupplierTest {
new Record<>(
"pk1",
new SubscriptionResponseWrapper<>(
- newValue.getHash(),
+ newValue.hash(),
"foo",
null
),
@@ -237,7 +237,7 @@ public class SubscriptionJoinProcessorSupplierTest {
new Record<>(
"pk1",
new SubscriptionResponseWrapper<>(
- newValue.getHash(),
+ newValue.hash(),
null,
null
),
@@ -270,7 +270,7 @@ public class SubscriptionJoinProcessorSupplierTest {
new Record<>(
"pk1",
new SubscriptionResponseWrapper<>(
- newValue.getHash(),
+ newValue.hash(),
"foo",
12
),
@@ -291,7 +291,7 @@ public class SubscriptionJoinProcessorSupplierTest {
new Record<>(
"pk1",
new SubscriptionResponseWrapper<>(
- newValue.getHash(),
+ newValue.hash(),
null,
12
),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
index e4b35685c2f..9be5e39cc15 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
@@ -82,9 +82,9 @@ public class SubscriptionResponseWrapperSerdeTest {
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
- assertArrayEquals(hashedValue, result.getOriginalValueHash());
- assertEquals(foreignValue, result.getForeignValue());
- assertNull(result.getPrimaryPartition());
+ assertArrayEquals(hashedValue, result.originalValueHash());
+ assertEquals(foreignValue, result.foreignValue());
+ assertNull(result.primaryPartition());
}
@Test
@@ -96,9 +96,9 @@ public class SubscriptionResponseWrapperSerdeTest {
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
- assertArrayEquals(hashedValue, result.getOriginalValueHash());
- assertNull(result.getForeignValue());
- assertNull(result.getPrimaryPartition());
+ assertArrayEquals(hashedValue, result.originalValueHash());
+ assertNull(result.foreignValue());
+ assertNull(result.primaryPartition());
}
@Test
@@ -111,9 +111,9 @@ public class SubscriptionResponseWrapperSerdeTest {
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
- assertArrayEquals(hashedValue, result.getOriginalValueHash());
- assertEquals(foreignValue, result.getForeignValue());
- assertNull(result.getPrimaryPartition());
+ assertArrayEquals(hashedValue, result.originalValueHash());
+ assertEquals(foreignValue, result.foreignValue());
+ assertNull(result.primaryPartition());
}
@Test
@@ -126,9 +126,9 @@ public class SubscriptionResponseWrapperSerdeTest {
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
- assertArrayEquals(hashedValue, result.getOriginalValueHash());
- assertEquals(foreignValue, result.getForeignValue());
- assertNull(result.getPrimaryPartition());
+ assertArrayEquals(hashedValue, result.originalValueHash());
+ assertEquals(foreignValue, result.foreignValue());
+ assertNull(result.primaryPartition());
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
index b64d497fd61..db2842401b5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
@@ -51,11 +51,11 @@ public class SubscriptionWrapperSerdeTest {
final SubscriptionWrapper deserialized = (SubscriptionWrapper)
swSerde.deserializer()
.deserialize(null, serialized);
- assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE,
deserialized.getInstruction());
- assertArrayEquals(hashedValue, deserialized.getHash());
- assertEquals(originalKey, deserialized.getPrimaryKey());
- assertEquals(primaryPartition, deserialized.getPrimaryPartition());
- assertEquals(version, deserialized.getVersion());
+ assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE,
deserialized.instruction());
+ assertArrayEquals(hashedValue, deserialized.hash());
+ assertEquals(originalKey, deserialized.primaryKey());
+ assertEquals(primaryPartition, deserialized.primaryPartition());
+ assertEquals(version, deserialized.version());
}
@Test
@@ -76,11 +76,11 @@ public class SubscriptionWrapperSerdeTest {
final SubscriptionWrapper deserialized = (SubscriptionWrapper)
swSerde.deserializer()
.deserialize(null, serialized);
- assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE,
deserialized.getInstruction());
- assertArrayEquals(hashedValue, deserialized.getHash());
- assertEquals(originalKey, deserialized.getPrimaryKey());
- assertEquals(primaryPartition, deserialized.getPrimaryPartition());
- assertEquals(version, deserialized.getVersion());
+ assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE,
deserialized.instruction());
+ assertArrayEquals(hashedValue, deserialized.hash());
+ assertEquals(originalKey, deserialized.primaryKey());
+ assertEquals(primaryPartition, deserialized.primaryPartition());
+ assertEquals(version, deserialized.version());
}
@Test
@@ -104,11 +104,11 @@ public class SubscriptionWrapperSerdeTest {
final SubscriptionWrapper deserialized = (SubscriptionWrapper)
swSerde.deserializer()
.deserialize(null, serialized);
- assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE,
deserialized.getInstruction());
- assertArrayEquals(hashedValue, deserialized.getHash());
- assertEquals(originalKey, deserialized.getPrimaryKey());
- assertEquals(0, deserialized.getVersion());
- assertNull(deserialized.getPrimaryPartition());
+ assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE,
deserialized.instruction());
+ assertArrayEquals(hashedValue, deserialized.hash());
+ assertEquals(originalKey, deserialized.primaryKey());
+ assertEquals(0, deserialized.version());
+ assertNull(deserialized.primaryPartition());
}
@Test
@@ -128,11 +128,11 @@ public class SubscriptionWrapperSerdeTest {
final byte[] serialized = swSerde.serializer().serialize(null,
wrapper);
final SubscriptionWrapper deserialized = (SubscriptionWrapper)
swSerde.deserializer().deserialize(null, serialized);
-
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
deserialized.getInstruction());
- assertArrayEquals(hashedValue, deserialized.getHash());
- assertEquals(originalKey, deserialized.getPrimaryKey());
- assertEquals(primaryPartition, deserialized.getPrimaryPartition());
- assertEquals(version, deserialized.getVersion());
+
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
deserialized.instruction());
+ assertArrayEquals(hashedValue, deserialized.hash());
+ assertEquals(originalKey, deserialized.primaryKey());
+ assertEquals(primaryPartition, deserialized.primaryPartition());
+ assertEquals(version, deserialized.version());
}
@Test
@@ -153,11 +153,11 @@ public class SubscriptionWrapperSerdeTest {
final SubscriptionWrapper deserialized = (SubscriptionWrapper)
swSerde.deserializer()
.deserialize(null, serialized);
-
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
deserialized.getInstruction());
- assertArrayEquals(hashedValue, deserialized.getHash());
- assertEquals(originalKey, deserialized.getPrimaryKey());
- assertEquals(primaryPartition, deserialized.getPrimaryPartition());
- assertEquals(version, deserialized.getVersion());
+
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
deserialized.instruction());
+ assertArrayEquals(hashedValue, deserialized.hash());
+ assertEquals(originalKey, deserialized.primaryKey());
+ assertEquals(primaryPartition, deserialized.primaryPartition());
+ assertEquals(version, deserialized.version());
}
@Test
@@ -176,11 +176,11 @@ public class SubscriptionWrapperSerdeTest {
final byte[] serialized = swSerde.serializer().serialize(null,
wrapper);
final SubscriptionWrapper deserialized = (SubscriptionWrapper)
swSerde.deserializer().deserialize(null, serialized);
-
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
deserialized.getInstruction());
- assertArrayEquals(hashedValue, deserialized.getHash());
- assertEquals(originalKey, deserialized.getPrimaryKey());
- assertEquals(primaryPartition, deserialized.getPrimaryPartition());
- assertEquals(version, deserialized.getVersion());
+
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
deserialized.instruction());
+ assertArrayEquals(hashedValue, deserialized.hash());
+ assertEquals(originalKey, deserialized.primaryKey());
+ assertEquals(primaryPartition, deserialized.primaryPartition());
+ assertEquals(version, deserialized.version());
}
@Test