This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new d64f4fa NIFI-6760: When writing/reading the length of a DataFrame, do
so usin… (#3801)
d64f4fa is described below
commit d64f4fa942342482012b3caf4f8e3bf475685d03
Author: markap14 <[email protected]>
AuthorDate: Thu Oct 10 10:19:21 2019 -0400
NIFI-6760: When writing/reading the length of a DataFrame, do so usin…
(#3801)
* NIFI-6760: When writing/reading the length of a DataFrame, do so using a
4-byte integer instead of a 2-byte short. When using uncompressed data, we know
that the length of the DataFrame will be no more than 64 KB so a 2-byte short
is sufficient. However, if data is compresed, there's a chance that the
compressed form of the data will be larger than the uncompressed form (for
example, with random binary data or with encrypted data). In this situation, we
can end up overflowing the 2- [...]
* NIFI-6760: Fixed unit tests
---
.../queue/clustered/client/async/nio/LoadBalanceSession.java | 9 ++++-----
.../queue/clustered/server/StandardLoadBalanceProtocol.java | 4 ++--
.../queue/clustered/client/async/nio/TestLoadBalanceSession.java | 8 ++++----
.../queue/clustered/server/TestStandardLoadBalanceProtocol.java | 2 +-
4 files changed, 11 insertions(+), 12 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
index c3496b8..0169d07 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.java
@@ -319,16 +319,15 @@ public class LoadBalanceSession {
final byte[] compressed = compressDataFrame(byteBuffer,
bytesRead);
final int compressedMaxLen = compressed.length;
- buffer = ByteBuffer.allocate(3 + compressedMaxLen);
+ buffer = ByteBuffer.allocate(5 + compressedMaxLen);
buffer.put((byte)
LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
- buffer.putShort((short) compressedMaxLen);
+ buffer.putInt(compressedMaxLen);
buffer.put(compressed, 0, compressedMaxLen);
-
} else {
- buffer = ByteBuffer.allocate(3 + bytesRead);
+ buffer = ByteBuffer.allocate(5 + bytesRead);
buffer.put((byte)
LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
- buffer.putShort((short) bytesRead);
+ buffer.putInt(bytesRead);
buffer.put(byteBuffer, 0, bytesRead);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
index d9fdd2a..c29a9ec 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
@@ -499,7 +499,7 @@ public class StandardLoadBalanceProtocol implements
LoadBalanceProtocol {
throw new IOException("Expected a Data Frame Indicator from Peer "
+ peerDescription + " but received a value of " + dataFrameIndicator);
}
- int dataFrameLength = in.readUnsignedShort();
+ int dataFrameLength = in.readInt();
logger.trace("Received Data Frame Length of {} for {}",
dataFrameLength, peerDescription);
byte[] buffer = getDataBuffer();
@@ -535,7 +535,7 @@ public class StandardLoadBalanceProtocol implements
LoadBalanceProtocol {
throw new IOException("Expected a Data Frame Indicator from
Peer " + peerDescription + " but received a value of " + dataFrameIndicator);
}
- dataFrameLength = in.readUnsignedShort();
+ dataFrameLength = in.readInt();
logger.trace("Received Data Frame Length of {} for {}",
dataFrameLength, peerDescription);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
index 20b6add..cfe4e2b 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
@@ -157,7 +157,7 @@ public class TestLoadBalanceSession {
expectedDos.writeLong(flowFile1.getLineageStartDate()); // lineage
start date
expectedDos.writeLong(flowFile1.getEntryDate()); // entry date
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
- expectedDos.writeShort(5);
+ expectedDos.writeInt(5);
expectedDos.write("hello".getBytes());
expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME);
@@ -171,7 +171,7 @@ public class TestLoadBalanceSession {
expectedDos.writeLong(flowFile2.getLineageStartDate()); // lineage
start date
expectedDos.writeLong(flowFile2.getEntryDate()); // entry date
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
- expectedDos.writeShort(8);
+ expectedDos.writeInt(8);
expectedDos.write("good-bye".getBytes());
expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME);
@@ -246,12 +246,12 @@ public class TestLoadBalanceSession {
// first data frame
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
- expectedDos.writeShort(LoadBalanceSession.MAX_DATA_FRAME_SIZE);
+ expectedDos.writeInt(LoadBalanceSession.MAX_DATA_FRAME_SIZE);
expectedDos.write(Arrays.copyOfRange(content, 0,
LoadBalanceSession.MAX_DATA_FRAME_SIZE));
// second data frame
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
- expectedDos.writeShort(content.length -
LoadBalanceSession.MAX_DATA_FRAME_SIZE);
+ expectedDos.writeInt(content.length -
LoadBalanceSession.MAX_DATA_FRAME_SIZE);
expectedDos.write(Arrays.copyOfRange(content,
LoadBalanceSession.MAX_DATA_FRAME_SIZE, content.length));
expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
index a5de8ee..57fc7d9 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
@@ -655,7 +655,7 @@ public class TestStandardLoadBalanceProtocol {
final int length = Math.min(content.length - offset, 65535);
out.write(DATA_FRAME_FOLLOWS);
- out.writeShort(length);
+ out.writeInt(length);
out.write(content, offset, length);
}