This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ae14738dea NIFI-12288 This closes #7950. Improved Long and Integer
handling in Utilities
ae14738dea is described below
commit ae14738dead33ca20dc2e779f5157118ee2b228f
Author: exceptionfactory <[email protected]>
AuthorDate: Fri Oct 27 17:19:50 2023 -0500
NIFI-12288 This closes #7950. Improved Long and Integer handling in
Utilities
- Added explicit round in FormatUtils.makeWholeNumberTime()
- Removed unnecessary boxing in component descriptors
- Maintained long number tracking for releasable counts in Wait and Notify
Processors
Signed-off-by: Joseph Witt <[email protected]>
---
.../java/org/apache/nifi/util/FormatUtils.java | 5 +-
.../status/history/ProcessorStatusDescriptor.java | 88 +++++++++++-----------
.../RemoteProcessGroupStatusDescriptor.java | 45 +++++------
.../processors/standard/WaitNotifyProtocol.java | 9 ++-
.../standard/TestWaitNotifyProtocol.java | 60 ++++++---------
5 files changed, 95 insertions(+), 112 deletions(-)
diff --git
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
index e5da2520d5..8dec493f64 100644
---
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
+++
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
@@ -217,7 +217,7 @@ public class FormatUtils {
durationLong = Math.round(durationVal);
} else {
// Try reducing the size of the units to make the input a long
- List wholeResults = makeWholeNumberTime(durationVal,
specifiedTimeUnit);
+ List<?> wholeResults = makeWholeNumberTime(durationVal,
specifiedTimeUnit);
durationLong = (long) wholeResults.get(0);
specifiedTimeUnit = (TimeUnit) wholeResults.get(1);
}
@@ -247,7 +247,8 @@ public class FormatUtils {
protected static List<Object> makeWholeNumberTime(double decimal, TimeUnit
timeUnit) {
// If the value is already a whole number, return it and the current
time unit
if (decimal == Math.rint(decimal)) {
- return Arrays.asList(new Object[]{(long) decimal, timeUnit});
+ final long rounded = Math.round(decimal);
+ return Arrays.asList(new Object[]{rounded, timeUnit});
} else if (TimeUnit.NANOSECONDS == timeUnit) {
// The time unit is as small as possible
if (decimal < 1.0) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
index c8f51267ec..feff921299 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
@@ -57,7 +57,7 @@ public enum ProcessorStatusDescriptor {
"FlowFiles In (5 mins)",
"The number of FlowFiles that this Processor has pulled from its
queues in the past 5 minutes",
Formatter.COUNT,
- s -> Long.valueOf(s.getInputCount())),
+ s -> (long) s.getInputCount()),
OUTPUT_BYTES(
"outputBytes",
@@ -71,14 +71,14 @@ public enum ProcessorStatusDescriptor {
"FlowFiles Out (5 mins)",
"The number of FlowFiles that this Processor has transferred to
downstream queues in the past 5 minutes",
Formatter.COUNT,
- s -> Long.valueOf(s.getOutputCount())),
+ s -> (long) s.getOutputCount()),
TASK_COUNT(
"taskCount",
"Tasks (5 mins)",
"The number of tasks that this Processor has completed in the past 5
minutes",
Formatter.COUNT,
- s -> Long.valueOf(s.getInvocations())),
+ s -> (long) s.getInvocations()),
TASK_MILLIS(
"taskMillis",
@@ -100,7 +100,7 @@ public enum ProcessorStatusDescriptor {
"FlowFiles Removed (5 mins)",
"The total number of FlowFiles removed by this Processor in the last 5
minutes",
Formatter.COUNT,
- s -> Long.valueOf(s.getFlowFilesRemoved())),
+ s -> (long) s.getFlowFilesRemoved()),
AVERAGE_LINEAGE_DURATION(
"averageLineageDuration",
@@ -108,27 +108,27 @@ public enum ProcessorStatusDescriptor {
"The average amount of time that a FlowFile took to process (from
receipt until this Processor finished processing it) in the past 5 minutes.",
Formatter.DURATION,
s -> s.getAverageLineageDuration(TimeUnit.MILLISECONDS),
- new ValueReducer<StatusSnapshot, Long>() {
- @Override
- public Long reduce(final List<StatusSnapshot> values) {
- long millis = 0L;
- int count = 0;
-
- for (final StatusSnapshot snapshot : values) {
- final long removed =
snapshot.getStatusMetric(FLOWFILES_REMOVED.getDescriptor()).longValue();
- final long outputCount =
snapshot.getStatusMetric(OUTPUT_COUNT.getDescriptor()).longValue();
- final long processed = removed + outputCount;
-
- count += processed;
-
- final long avgMillis =
snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
- final long totalMillis = avgMillis * processed;
- millis += totalMillis;
- }
+ new ValueReducer<>() {
+ @Override
+ public Long reduce(final List<StatusSnapshot> values) {
+ long millis = 0L;
+ long count = 0;
+
+ for (final StatusSnapshot snapshot : values) {
+ final long removed =
snapshot.getStatusMetric(FLOWFILES_REMOVED.getDescriptor());
+ final long outputCount =
snapshot.getStatusMetric(OUTPUT_COUNT.getDescriptor());
+ final long processed = removed + outputCount;
+
+ count += processed;
+
+ final long avgMillis =
snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor());
+ final long totalMillis = avgMillis * processed;
+ millis += totalMillis;
+ }
- return count == 0 ? 0 : millis / count;
- }
- },
+ return count == 0 ? 0 : millis / count;
+ }
+ },
true
),
@@ -138,31 +138,31 @@ public enum ProcessorStatusDescriptor {
"The average number of nanoseconds it took this Processor to complete
a task, over the past 5 minutes",
Formatter.COUNT,
s -> s.getInvocations() == 0 ? 0 : s.getProcessingNanos() /
s.getInvocations(),
- new ValueReducer<StatusSnapshot, Long>() {
- @Override
- public Long reduce(final List<StatusSnapshot> values) {
- long procNanos = 0L;
- int invocations = 0;
-
- for (final StatusSnapshot snapshot : values) {
- final Long taskNanos =
snapshot.getStatusMetric(TASK_NANOS.getDescriptor());
- if (taskNanos != null) {
- procNanos += taskNanos.longValue();
+ new ValueReducer<>() {
+ @Override
+ public Long reduce(final List<StatusSnapshot> values) {
+ long procNanos = 0L;
+ int invocations = 0;
+
+ for (final StatusSnapshot snapshot : values) {
+ final Long taskNanos =
snapshot.getStatusMetric(TASK_NANOS.getDescriptor());
+ if (taskNanos != null) {
+ procNanos += taskNanos;
+ }
+
+ final Long taskInvocations =
snapshot.getStatusMetric(TASK_COUNT.getDescriptor());
+ if (taskInvocations != null) {
+ invocations += taskInvocations.intValue();
+ }
}
- final Long taskInvocations =
snapshot.getStatusMetric(TASK_COUNT.getDescriptor());
- if (taskInvocations != null) {
- invocations += taskInvocations.intValue();
+ if (invocations == 0) {
+ return 0L;
}
- }
- if (invocations == 0) {
- return 0L;
+ return procNanos / invocations;
}
-
- return procNanos / invocations;
- }
- },
+ },
true
);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
index 6b131d2055..c5c70c7798 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
@@ -57,25 +57,20 @@ public enum RemoteProcessGroupStatusDescriptor {
"Received Bytes Per Second",
"The data rate at which data was received from the remote system in
the past 5 minutes in terms of Bytes Per Second",
Formatter.DATA_SIZE,
- s -> s.getReceivedContentSize().longValue() / 300L),
+ s -> s.getReceivedContentSize() / 300L),
SENT_BYTES_PER_SECOND(
"sentBytesPerSecond",
"Sent Bytes Per Second",
"The data rate at which data was received from the remote system in
the past 5 minutes in terms of Bytes Per Second",
Formatter.DATA_SIZE,
- s -> s.getSentContentSize().longValue() / 300L),
+ s -> s.getSentContentSize() / 300L),
TOTAL_BYTES_PER_SECOND("totalBytesPerSecond",
"Total Bytes Per Second",
"The sum of the send and receive data rate from the remote system in
the past 5 minutes in terms of Bytes Per Second",
Formatter.DATA_SIZE,
- new ValueMapper<RemoteProcessGroupStatus>() {
- @Override
- public Long getValue(final RemoteProcessGroupStatus status) {
- return
Long.valueOf((status.getReceivedContentSize().longValue() +
status.getSentContentSize().longValue()) / 300L);
- }
- }),
+ status -> (status.getReceivedContentSize() +
status.getSentContentSize()) / 300L),
AVERAGE_LINEAGE_DURATION(
"averageLineageDuration",
@@ -83,24 +78,24 @@ public enum RemoteProcessGroupStatusDescriptor {
"The average amount of time that a FlowFile took to process from
receipt to drop in the past 5 minutes. For Processors that do not terminate
FlowFiles, this value will be 0.",
Formatter.DURATION,
s -> s.getAverageLineageDuration(TimeUnit.MILLISECONDS),
- new ValueReducer<StatusSnapshot, Long>() {
- @Override
- public Long reduce(final List<StatusSnapshot> values) {
- long millis = 0L;
- int count = 0;
-
- for (final StatusSnapshot snapshot : values) {
- final long sent =
snapshot.getStatusMetric(SENT_COUNT.getDescriptor()).longValue();
- count += sent;
-
- final long avgMillis =
snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
- final long totalMillis = avgMillis * sent;
- millis += totalMillis;
+ new ValueReducer<>() {
+ @Override
+ public Long reduce(final List<StatusSnapshot> values) {
+ long millis = 0L;
+ long count = 0;
+
+ for (final StatusSnapshot snapshot : values) {
+ final long sent =
snapshot.getStatusMetric(SENT_COUNT.getDescriptor());
+ count += sent;
+
+ final long avgMillis =
snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor());
+ final long totalMillis = avgMillis * sent;
+ millis += totalMillis;
+ }
+
+ return count == 0 ? 0 : millis / count;
}
-
- return count == 0 ? 0 : millis / count;
- }
- });
+ });
private final MetricDescriptor<RemoteProcessGroupStatus> descriptor;
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
index 2c9c9fdfcc..e18a1f4776 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
@@ -70,7 +70,7 @@ public class WaitNotifyProtocol {
transient private AtomicCacheEntry<String, String, Object> cachedEntry;
private Map<String, Long> counts = new HashMap<>();
private Map<String, String> attributes = new HashMap<>();
- private int releasableCount = 0;
+ private long releasableCount = 0;
public Map<String, Long> getCounts() {
return counts;
@@ -110,11 +110,11 @@ public class WaitNotifyProtocol {
return count != null ? count : 0;
}
- public int getReleasableCount() {
+ public long getReleasableCount() {
return releasableCount;
}
- public void setReleasableCount(int releasableCount) {
+ public void setReleasableCount(long releasableCount) {
this.releasableCount = releasableCount;
}
@@ -155,7 +155,8 @@ public class WaitNotifyProtocol {
}
}
- int releaseCount = Math.min(releasableCount, candidateSize);
+ // Convert to integer for list index sizing
+ final int releaseCount = Math.toIntExact(Math.min(releasableCount,
candidateSize));
released.accept(candidates.subList(0, releaseCount));
waiting.accept(candidates.subList(releaseCount, candidateSize));
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
index 5429bf3f43..d68f6699e3 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
@@ -44,8 +44,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -56,8 +56,8 @@ public class TestWaitNotifyProtocol {
private final ObjectMapper mapper = new ObjectMapper();
private AtomicDistributedMapCacheClient<Long> cache;
- @SuppressWarnings("unchecked")
- private final Answer successfulReplace = invocation -> {
+
+ private final Answer<?> successfulReplace = invocation -> {
final AtomicCacheEntry<String, String, Long> entry =
invocation.getArgument(0);
cacheEntries.put(entry.getKey(), new
AtomicCacheEntry<>(entry.getKey(), entry.getValue(),
entry.getRevision().orElse(0L) + 1));
return true;
@@ -83,11 +83,7 @@ public class TestWaitNotifyProtocol {
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
final String signalId = "signal-id";
- try {
- protocol.notify(signalId, "a", 1, null);
- fail("Notify should fail after retrying few times.");
- } catch (ConcurrentModificationException e) {
- }
+ assertThrows(ConcurrentModificationException.class, () ->
protocol.notify(signalId, "a", 1, null));
}
@Test
@@ -237,7 +233,7 @@ public class TestWaitNotifyProtocol {
attributesSerializer.serialize(cachedAttributes, bos);
final String signalId = "old-entry";
- cacheEntries.put(signalId, new AtomicCacheEntry<>(signalId, new
String(bos.toByteArray(), StandardCharsets.UTF_8), 0L));
+ cacheEntries.put(signalId, new AtomicCacheEntry<>(signalId,
bos.toString(StandardCharsets.UTF_8), 0L));
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
final Signal signal = protocol.getSignal(signalId);
@@ -248,12 +244,7 @@ public class TestWaitNotifyProtocol {
assertEquals("value3", signal.getAttributes().get("key3"));
cacheEntries.put(signalId, new AtomicCacheEntry<>(signalId,
"UNSUPPORTED_FORMAT", 0L));
- try {
- protocol.getSignal(signalId);
- fail("Should fail since cached value was not in expected format.");
- } catch (DeserializationException e) {
- }
-
+ assertThrows(DeserializationException.class, () ->
protocol.getSignal(signalId));
}
@Test
@@ -263,14 +254,11 @@ public class TestWaitNotifyProtocol {
final List<Integer> released = new ArrayList<>();
final List<Integer> waiting = new ArrayList<>();
- // Test default name.
- final String counterName = DEFAULT_COUNT_NAME;
-
final BiConsumer<Long, Integer> releaseCandidate =
(requiredCountForPass, releasableCandidatePerPass) -> {
released.clear();
waiting.clear();
- signal.releaseCandidates(counterName, requiredCountForPass,
releasableCandidatePerPass, candidates,
- r -> released.addAll(r), w -> waiting.addAll(w));
+ signal.releaseCandidates(DEFAULT_COUNT_NAME, requiredCountForPass,
releasableCandidatePerPass, candidates,
+ released::addAll, waiting::addAll);
};
final Field releasableCount =
Signal.class.getDeclaredField("releasableCount");
@@ -281,7 +269,7 @@ public class TestWaitNotifyProtocol {
assertEquals(0, released.size());
assertEquals(10, waiting.size());
assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME));
- assertEquals(0, releasableCount.getInt(signal));
+ assertEquals(0, releasableCount.getLong(signal));
// Counter is not enough yet.
signal.getCounts().put(DEFAULT_COUNT_NAME, 1L);
@@ -289,7 +277,7 @@ public class TestWaitNotifyProtocol {
assertEquals(0, released.size());
assertEquals(10, waiting.size());
assertEquals(1, signal.getCount(DEFAULT_COUNT_NAME)); // Counter
incremented, but not enough
- assertEquals(0, releasableCount.getInt(signal));
+ assertEquals(0, releasableCount.getLong(signal));
// Counter reached the target.
signal.getCounts().put(DEFAULT_COUNT_NAME, 3L);
@@ -297,7 +285,7 @@ public class TestWaitNotifyProtocol {
assertEquals(1, released.size());
assertEquals(9, waiting.size());
assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // Counter 3 was
converted into 1 release
- assertEquals(0, releasableCount.getInt(signal));
+ assertEquals(0, releasableCount.getLong(signal));
// Counter reached the target for two candidates.
signal.getCounts().put(DEFAULT_COUNT_NAME, 6L);
@@ -305,7 +293,7 @@ public class TestWaitNotifyProtocol {
assertEquals(2, released.size());
assertEquals(8, waiting.size());
assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // Counter 3 was
converted into 1 release
- assertEquals(0, releasableCount.getInt(signal));
+ assertEquals(0, releasableCount.getLong(signal));
// Counter reached the target for two candidates, and reminder is 2.
signal.getCounts().put(DEFAULT_COUNT_NAME, 11L);
@@ -313,7 +301,7 @@ public class TestWaitNotifyProtocol {
assertEquals(3, released.size()); // 11 / 3 = 3
assertEquals(7, waiting.size());
assertEquals(2, signal.getCount(DEFAULT_COUNT_NAME)); // 11 % 3 = 2
- assertEquals(0, releasableCount.getInt(signal));
+ assertEquals(0, releasableCount.getLong(signal));
// Counter reached the target for two pass count and each pass can
release 2 candidates.
signal.getCounts().put(DEFAULT_COUNT_NAME, 6L);
@@ -321,7 +309,7 @@ public class TestWaitNotifyProtocol {
assertEquals(4, released.size()); // (6 / 3) * 2 = 4
assertEquals(6, waiting.size());
assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // 6 % 3 = 0
- assertEquals(0, releasableCount.getInt(signal));
+ assertEquals(0, releasableCount.getLong(signal));
// If there are counts more than enough to release current candidates,
unused releasableCount should remain.
signal.getCounts().put(DEFAULT_COUNT_NAME, 50L);
@@ -329,11 +317,9 @@ public class TestWaitNotifyProtocol {
assertEquals(10, released.size()); // (50 / 3) * 2 = 32. Used 10.
assertEquals(0, waiting.size());
assertEquals(2, signal.getCount(DEFAULT_COUNT_NAME)); // 50 % 3 = 2.
- assertEquals(22, releasableCount.getInt(signal)); // 32 - 10 = 22.
-
+ assertEquals(22, releasableCount.getLong(signal)); // 32 - 10 = 22.
}
-
@Test
public void testReleaseCandidateTotal() throws Exception {
final List<Integer> candidates = IntStream.range(0,
10).boxed().collect(Collectors.toList());
@@ -348,7 +334,7 @@ public class TestWaitNotifyProtocol {
released.clear();
waiting.clear();
signal.releaseCandidates(emptyCounterName, requiredCountForPass,
releasableCandidatePerPass, candidates,
- r -> released.addAll(r), w -> waiting.addAll(w));
+ released::addAll, waiting::addAll);
};
final String counterA = "counterA";
@@ -364,7 +350,7 @@ public class TestWaitNotifyProtocol {
assertEquals(10, waiting.size());
assertEquals(0, signal.getCount(emptyCounterName));
assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME));
- assertEquals(0, releasableCount.getInt(signal));
+ assertEquals(0, releasableCount.getLong(signal));
// Counter is not enough yet.
signal.getCounts().put(counterA, 1L);
@@ -374,7 +360,7 @@ public class TestWaitNotifyProtocol {
assertEquals(10, waiting.size());
assertEquals(1, signal.getCount(emptyCounterName)); // Counter
incremented, but not enough
assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME));
- assertEquals(0, releasableCount.getInt(signal));
+ assertEquals(0, releasableCount.getLong(signal));
// Counter reached the target.
signal.getCounts().put(counterA, 1L);
@@ -386,7 +372,7 @@ public class TestWaitNotifyProtocol {
assertEquals(9, waiting.size());
assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was
converted into 1 release
assertEquals(-3, signal.getCount(CONSUMED_COUNT_NAME));
- assertEquals(0, releasableCount.getInt(signal));
+ assertEquals(0, releasableCount.getLong(signal));
// Counter reached the target for two candidates.
signal.getCounts().put(counterA, 1L);
@@ -398,7 +384,7 @@ public class TestWaitNotifyProtocol {
assertEquals(8, waiting.size());
assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was
converted into 1 release
assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME));
- assertEquals(0, releasableCount.getInt(signal));
+ assertEquals(0, releasableCount.getLong(signal));
// Counter reached the target for two candidates, and reminder is 2.
signal.getCounts().put(counterA, 3L);
@@ -410,7 +396,7 @@ public class TestWaitNotifyProtocol {
assertEquals(7, waiting.size());
assertEquals(2, signal.getCount(emptyCounterName));
assertEquals(-9, signal.getCount(CONSUMED_COUNT_NAME));
- assertEquals(0, releasableCount.getInt(signal));
+ assertEquals(0, releasableCount.getLong(signal));
// Counter reached the target for two pass count and each pass can
release 2 candidates.
signal.getCounts().put(counterA, 1L);
@@ -422,7 +408,7 @@ public class TestWaitNotifyProtocol {
assertEquals(6, waiting.size());
assertEquals(0, signal.getCount(emptyCounterName));
assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME));
- assertEquals(0, releasableCount.getInt(signal));
+ assertEquals(0, releasableCount.getLong(signal));
// If there are counts more than enough to release current candidates,
unused releasableCount should remain.
signal.getCounts().put(counterA, 10L);
@@ -434,7 +420,7 @@ public class TestWaitNotifyProtocol {
assertEquals(0, waiting.size());
assertEquals(2, signal.getCount(emptyCounterName)); // 50 % 3 = 2.
assertEquals(-48, signal.getCount(CONSUMED_COUNT_NAME)); // 50 % 3 = 2.
- assertEquals(22, releasableCount.getInt(signal)); // 32 - 10 = 22.
+ assertEquals(22, releasableCount.getLong(signal)); // 32 - 10 = 22.
}