This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 16e3c52 Fix bug in AssignmentInfo#encode and add additional logging
(#7545)
16e3c52 is described below
commit 16e3c52f1d8b12106add2db87c6c622bbc932ac0
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Oct 17 15:09:55 2019 -0700
Fix bug in AssignmentInfo#encode and add additional logging (#7545)
Same as #7537
but targeted at 2.3 for cherry-pick
Reviewers: Bill Bejeck <[email protected]>
---
.../internals/StreamsPartitionAssignor.java | 24 ++++++++++++++++------
.../internals/assignment/AssignmentInfo.java | 16 +++++++--------
.../internals/assignment/SubscriptionInfo.java | 9 ++++----
.../internals/assignment/AssignmentInfoTest.java | 9 ++++++++
.../internals/assignment/SubscriptionInfoTest.java | 10 +++++++++
5 files changed, 49 insertions(+), 19 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index f99866e..e98ae3d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -416,9 +416,15 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
minReceivedMetadataVersion = usedVersion;
}
- final int latestSupportedVersion = info.latestSupportedVersion();
- if (latestSupportedVersion < minSupportedMetadataVersion) {
- minSupportedMetadataVersion = latestSupportedVersion;
+ final int supportedVersion = info.latestSupportedVersion();
+
+ if (supportedVersion < minSupportedMetadataVersion) {
+ log.debug("Downgrade the current minimum supported version {}
to the smaller seen supported version {}",
+ minSupportedMetadataVersion, supportedVersion);
+ minSupportedMetadataVersion = supportedVersion;
+ } else {
+ log.debug("Current minimum supported version remains at {},
last seen supported version was {}",
+ minSupportedMetadataVersion, supportedVersion);
}
// create the new client metadata if necessary
@@ -449,10 +455,15 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
}
if (minReceivedMetadataVersion <
SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
- log.info("Downgrading metadata to version {}. Latest supported
version is {}.",
+ log.info("Downgrade metadata to version {}. Latest supported
version is {}.",
minReceivedMetadataVersion,
SubscriptionInfo.LATEST_SUPPORTED_VERSION);
}
+ if (minSupportedMetadataVersion <
SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
+ log.info("Downgrade latest supported metadata to version {}.
Latest supported version is {}.",
+ minSupportedMetadataVersion,
+ SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+ }
log.debug("Constructed client metadata {} from the member
subscriptions.", clientsMetadata);
@@ -884,9 +895,10 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
log.info(
"Sent a version {} subscription and got version {}
assignment back (successful version probing). "
+
- "Downgrade subscription metadata to commonly supported
version and trigger new rebalance.",
+ "Downgrade subscription metadata to commonly supported
version {} and trigger new rebalance.",
usedSubscriptionMetadataVersion,
- receivedAssignmentMetadataVersion
+ receivedAssignmentMetadataVersion,
+ latestCommonlySupportedVersion
);
usedSubscriptionMetadataVersion =
latestCommonlySupportedVersion;
return true;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 5c7b037..3521def 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -143,7 +143,7 @@ public class AssignmentInfo {
break;
default:
throw new IllegalStateException("Unknown metadata version:
" + usedVersion
- + "; latest supported version: " +
LATEST_SUPPORTED_VERSION);
+ + "; latest commonly supported version: " +
commonlySupportedVersion);
}
out.flush();
@@ -206,14 +206,14 @@ public class AssignmentInfo {
private void encodeVersionThree(final DataOutputStream out) throws
IOException {
out.writeInt(3);
- out.writeInt(LATEST_SUPPORTED_VERSION);
+ out.writeInt(commonlySupportedVersion);
encodeActiveAndStandbyTaskAssignment(out);
encodePartitionsByHost(out);
}
private void encodeVersionFour(final DataOutputStream out) throws
IOException {
out.writeInt(4);
- out.writeInt(LATEST_SUPPORTED_VERSION);
+ out.writeInt(commonlySupportedVersion);
encodeActiveAndStandbyTaskAssignment(out);
encodePartitionsByHost(out);
out.writeInt(errCode);
@@ -230,7 +230,7 @@ public class AssignmentInfo {
final AssignmentInfo assignmentInfo;
final int usedVersion = in.readInt();
- final int latestSupportedVersion;
+ final int commonlySupportedVersion;
switch (usedVersion) {
case 1:
assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN);
@@ -241,13 +241,13 @@ public class AssignmentInfo {
decodeVersionTwoData(assignmentInfo, in);
break;
case 3:
- latestSupportedVersion = in.readInt();
- assignmentInfo = new AssignmentInfo(usedVersion,
latestSupportedVersion);
+ commonlySupportedVersion = in.readInt();
+ assignmentInfo = new AssignmentInfo(usedVersion,
commonlySupportedVersion);
decodeVersionThreeData(assignmentInfo, in);
break;
case 4:
- latestSupportedVersion = in.readInt();
- assignmentInfo = new AssignmentInfo(usedVersion,
latestSupportedVersion);
+ commonlySupportedVersion = in.readInt();
+ assignmentInfo = new AssignmentInfo(usedVersion,
commonlySupportedVersion);
decodeVersionFourData(assignmentInfo, in);
break;
default:
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index b4ad19f..57db25b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -207,11 +207,10 @@ public class SubscriptionInfo {
private ByteBuffer encodeVersionThree() {
final byte[] endPointBytes = prepareUserEndPoint();
-
final ByteBuffer buf =
ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
+ buf.putInt(3);
+ buf.putInt(latestSupportedVersion);
- buf.putInt(3); // used version
- buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
encodeClientUUID(buf);
encodeTasks(buf, prevTasks);
encodeTasks(buf, standbyTasks);
@@ -226,7 +225,7 @@ public class SubscriptionInfo {
final ByteBuffer buf =
ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
buf.putInt(4); // used version
- buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
+ buf.putInt(latestSupportedVersion); // supported version
encodeClientUUID(buf);
encodeTasks(buf, prevTasks);
encodeTasks(buf, standbyTasks);
@@ -273,7 +272,7 @@ public class SubscriptionInfo {
default:
latestSupportedVersion = data.getInt();
subscriptionInfo = new SubscriptionInfo(usedVersion,
latestSupportedVersion);
- log.info("Unable to decode subscription data: used version:
{}; latest supported version: {}", usedVersion, LATEST_SUPPORTED_VERSION);
+ log.info("Unable to decode subscription data: used version:
{}; latest supported version: {}", usedVersion, latestSupportedVersion);
}
return subscriptionInfo;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index 8b99065..3ed2f71 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -94,4 +94,13 @@ public class AssignmentInfoTest {
final AssignmentInfo expectedInfo = new AssignmentInfo(4,
AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks,
globalAssignment, 2);
assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
}
+
+ @Test
+ public void shouldEncodeAndDecodeSmallerCommonlySupportedVersion() {
+ final int usedVersion = AssignmentInfo.LATEST_SUPPORTED_VERSION - 1;
+ final int commonlySupportedVersion =
AssignmentInfo.LATEST_SUPPORTED_VERSION - 1;
+ final AssignmentInfo info = new AssignmentInfo(usedVersion,
commonlySupportedVersion, activeTasks, standbyTasks, globalAssignment, 0);
+ final AssignmentInfo expectedInfo = new AssignmentInfo(usedVersion,
commonlySupportedVersion, activeTasks, standbyTasks, globalAssignment, 0);
+ assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index 2a75c57..6492cc0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -90,6 +90,16 @@ public class SubscriptionInfoTest {
assertEquals(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1,
info.latestSupportedVersion());
}
+ @Test
+ public void shouldEncodeAndDecodeSmallerLatestSupportedVersion() {
+ final int usedVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION - 1;
+ final int latestSupportedVersion =
SubscriptionInfo.LATEST_SUPPORTED_VERSION - 1;
+
+ final SubscriptionInfo info = new SubscriptionInfo(usedVersion,
latestSupportedVersion, processId, activeTasks, standbyTasks, "localhost:80");
+ final SubscriptionInfo expectedInfo = new
SubscriptionInfo(usedVersion, latestSupportedVersion, processId, activeTasks,
standbyTasks, "localhost:80");
+ assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
+ }
+
private ByteBuffer encodeFutureVersion() {
final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */
+ 4 /* supported version
*/);