pvillard31 commented on code in PR #98:
URL: https://github.com/apache/nifi-api/pull/98#discussion_r3474948475
##########
src/main/java/org/apache/nifi/components/connector/Connector.java:
##########
@@ -242,4 +242,5 @@ public interface Connector {
* @return a Future that will be completed when the draining is complete
*/
CompletableFuture<Void> drainFlowFiles(FlowContext flowContext);
+
Review Comment:
Is the extra blank line at the end of Connector intended, or a stray change?
##########
src/test/java/org/apache/nifi/components/TestBacklog.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.components;
+
+import org.apache.nifi.components.Backlog.Precision;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestBacklog {
+
+ @Test
+ public void
testPlusSumsDimensionsAndKeepsExactWhenBothSidesReportTheSameShape() {
+ final Backlog left =
Backlog.builder().flowFiles(10L).bytes(100L).build();
+ final Backlog right =
Backlog.builder().flowFiles(5L).bytes(50L).build();
+
+ final Backlog combined = left.plus(right);
+
+ assertEquals(OptionalLong.of(15L), combined.getFlowFileCount());
+ assertEquals(OptionalLong.of(150L), combined.getByteCount());
+ assertFalse(combined.getRecordCount().isPresent());
+ assertEquals(Precision.EXACT, combined.getPrecision());
+ }
+
+ @Test
+ public void
testPlusDowngradesToAtLeastWhenOneSideOmitsADimensionTheOtherReports() {
+ // Left knows only flowFiles; right knows only bytes. The result
should still carry the known
+ // numeric values, but the precision must be AT_LEAST because each
side is "unknown" with
+ // respect to the dimension the other side reported. Treating
"unknown" as zero would let the
+ // combined Backlog falsely claim EXACT completeness.
+ final Backlog left = Backlog.flowFiles(10L);
+ final Backlog right = Backlog.bytes(100L);
+
+ final Backlog combined = left.plus(right);
+
+ assertEquals(OptionalLong.of(10L), combined.getFlowFileCount());
+ assertEquals(OptionalLong.of(100L), combined.getByteCount());
+ assertFalse(combined.getRecordCount().isPresent());
+ assertEquals(Precision.AT_LEAST, combined.getPrecision());
+ }
+
+ @Test
+ public void
testPlusDowngradesToAtLeastWhenAsymmetricEvenIfBothSidesAreExact() {
+ // Both sides are EXACT individually but report different sets of
dimensions. The combined
+ // view cannot be EXACT because each side is silent about a dimension
the other side knows.
+ final Backlog left =
Backlog.builder().flowFiles(1L).bytes(2L).precision(Precision.EXACT).build();
+ final Backlog right =
Backlog.builder().flowFiles(3L).bytes(4L).records(5L).precision(Precision.EXACT).build();
+
+ final Backlog combined = left.plus(right);
+
+ assertEquals(OptionalLong.of(4L), combined.getFlowFileCount());
+ assertEquals(OptionalLong.of(6L), combined.getByteCount());
+ assertEquals(OptionalLong.of(5L), combined.getRecordCount());
+ assertEquals(Precision.AT_LEAST, combined.getPrecision());
+ }
+
+ @Test
+ public void testPlusPropagatesAtLeastFromEitherOperand() {
+ final Backlog left =
Backlog.builder().flowFiles(1L).precision(Precision.AT_LEAST).build();
+ final Backlog right =
Backlog.builder().flowFiles(2L).precision(Precision.EXACT).build();
+
+ assertEquals(Precision.AT_LEAST, left.plus(right).getPrecision());
+ assertEquals(Precision.AT_LEAST, right.plus(left).getPrecision());
+ }
+
+ @Test
+ public void
testPlusUsesEarlierLastCaughtUpAndKeepsOnlySideWhenOtherMissing() {
+ final Instant earlier = Instant.parse("2025-01-01T00:00:00Z");
+ final Instant later = Instant.parse("2025-01-02T00:00:00Z");
+
+ final Backlog withEarlier =
Backlog.builder().flowFiles(0L).lastCaughtUp(earlier).build();
+ final Backlog withLater =
Backlog.builder().flowFiles(0L).lastCaughtUp(later).build();
+ assertEquals(earlier,
withEarlier.plus(withLater).getLastCaughtUp().orElseThrow());
+
+ final Backlog withoutTimestamp = Backlog.flowFiles(0L);
+ assertEquals(later,
withoutTimestamp.plus(withLater).getLastCaughtUp().orElseThrow());
+ }
+
+ @Test
+ public void testPlusOverflowOnLongSum() {
+ final Backlog left = Backlog.flowFiles(Long.MAX_VALUE);
+ final Backlog right = Backlog.flowFiles(1L);
+ assertThrows(ArithmeticException.class, () -> left.plus(right));
+ }
+
+ @Test
+ public void testCaughtUpFactoryProducesZerosAndTimestamp() {
Review Comment:
Can we add a small test for the builder rejecting negative flowFiles, bytes,
and records values?
##########
src/main/java/org/apache/nifi/components/Backlog.java:
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * <p>
+ * An immutable description of how much work remains for a Connector or
Processor
+ * to consume from its source system. A {@code Backlog} may report any
combination of
+ * FlowFile count, byte count, and record count, and may also indicate the
most recent
+ * time at which the component observed itself as fully caught up with the
source.
+ * </p>
+ *
+ * <p>
+ * All four dimensions are optional and independent. A component reports
only those
+ * dimensions that it can determine. For example, a Connector pulling from
an object
+ * store may know both the number of objects remaining and the total bytes
those
+ * objects represent, while a Connector pulling from a streaming system
may know only
+ * the number of records remaining.
+ * </p>
+ *
+ * <p>
+ * The numeric dimensions are interpreted in light of the {@link
Precision} attribute,
+ * which applies to the entire {@code Backlog}. {@link Precision#EXACT}
(the default)
+ * means the values are exact counts; {@link Precision#AT_LEAST} means the
values are
+ * lower bounds and the real counts may be higher.
+ * </p>
+ *
+ * <p>
+ * The {@link #getLastCaughtUp() lastCaughtUp} timestamp is unaffected by
precision and
+ * is always treated as exact. It represents the most recent moment the
component
+ * determined that there was zero data remaining to pull from the source.
It is computed
+ * by the component using whatever knowledge it has — typically during its
normal polling
+ * of the source — and is not merely the time a backlog query last
returned zero.
+ * </p>
+ *
+ * <p>
+ * Backlogs are composed via {@link #plus(Backlog)}; see that method for
the per-field
+ * combination rules.
+ * </p>
+ *
+ * <p>
+ * <b>Implementation Note:</b> This API is currently experimental, as it
is under very active
+ * development. As such, it is subject to change without notice between
releases.
+ * </p>
+ */
+public final class Backlog {
+
+ private final OptionalLong flowFileCount;
+ private final OptionalLong byteCount;
+ private final OptionalLong recordCount;
+ private final Optional<Instant> lastCaughtUp;
+ private final Precision precision;
+
+ private Backlog(final Builder builder) {
+ this.flowFileCount = builder.flowFileCount;
+ this.byteCount = builder.byteCount;
+ this.recordCount = builder.recordCount;
+ this.lastCaughtUp = builder.lastCaughtUp;
+ this.precision = builder.precision;
+ }
+
+ /**
+ * @return the number of FlowFiles remaining on the source, if known
+ */
+ public OptionalLong getFlowFileCount() {
+ return flowFileCount;
+ }
+
+ /**
+ * @return the total number of bytes remaining on the source, if known
+ */
+ public OptionalLong getByteCount() {
+ return byteCount;
+ }
+
+ /**
+ * @return the number of records remaining on the source, if known
+ */
+ public OptionalLong getRecordCount() {
+ return recordCount;
+ }
+
+ /**
+ * @return the most recent moment the component observed itself as fully
caught up with the source, if known
+ */
+ public Optional<Instant> getLastCaughtUp() {
+ return lastCaughtUp;
+ }
+
+ /**
+ * @return the {@link Precision} of the numeric dimensions of this
Backlog. Never null.
+ */
+ public Precision getPrecision() {
+ return precision;
+ }
+
+ /**
+ * <p>
+ * Combines this Backlog with another to produce a new {@code Backlog}
describing
+ * the union of the two. Useful for composing a flow-wide backlog from
multiple sources
+ * such as multiple Processor reports and queue snapshots.
+ * </p>
+ *
+ * <p>
+ * Combination rules:
+ * </p>
+ * <ul>
+ * <li>
+ * Numeric dimensions ({@code flowFileCount}, {@code byteCount},
{@code recordCount}):
+ * if both sides have a value, the result is their sum. If only
one side has a value,
+ * it is carried through but the combined {@link Precision} is
downgraded to
+ * {@link Precision#AT_LEAST} (see the precision rule below),
because the side that did
+ * not report the dimension is unknown rather than zero. If
neither side has a value,
+ * the field stays empty.
+ * </li>
+ * <li>
+ * {@link #getLastCaughtUp() lastCaughtUp}: <b>not summable.</b>
If both sides have a value,
+ * the result is the earlier of the two timestamps — the more
conservative claim about how
+ * recently the system was fully caught up. If only one side has
it, that one is carried
+ * through.
+ * </li>
+ * <li>
+ * {@link #getPrecision() precision}: the result is {@link
Precision#EXACT} only when both
+ * sides are {@code EXACT} <i>and</i> both sides report the same
set of populated numeric
+ * dimensions. Otherwise the result is {@link Precision#AT_LEAST}.
Any uncertainty in either
+ * operand — including a missing dimension on one side that the
other side reported —
+ * taints the result, because "unknown" must not be treated as
zero.
+ * </li>
+ * </ul>
+ *
+ * @param other the other Backlog to combine with this one
+ * @return a new Backlog representing the combination
+ * @throws ArithmeticException if a numeric sum would overflow {@code long}
+ */
+ public Backlog plus(final Backlog other) {
+ Objects.requireNonNull(other, "other Backlog must not be null");
+
+ final OptionalLong sumFlowFiles = sumOptional(flowFileCount,
other.flowFileCount);
+ final OptionalLong sumBytes = sumOptional(byteCount, other.byteCount);
+ final OptionalLong sumRecords = sumOptional(recordCount,
other.recordCount);
+ final Optional<Instant> earliestCaughtUp = earlierOf(lastCaughtUp,
other.lastCaughtUp);
+
+ // A dimension that one side reports and the other does not is
"unknown" on the omitting side,
+ // not zero. Carrying the known value forward and still reporting
EXACT would let the result
+ // claim completeness it cannot back up, so any such asymmetry forces
AT_LEAST.
+ final boolean dimensionsAsymmetric = flowFileCount.isPresent() !=
other.flowFileCount.isPresent()
+ || byteCount.isPresent() != other.byteCount.isPresent()
+ || recordCount.isPresent() != other.recordCount.isPresent();
+ final boolean bothExact = precision == Precision.EXACT &&
other.precision == Precision.EXACT;
+ final Precision combinedPrecision = (bothExact &&
!dimensionsAsymmetric) ? Precision.EXACT : Precision.AT_LEAST;
+
+ final Builder builder = new Builder().precision(combinedPrecision);
+ if (sumFlowFiles.isPresent()) {
+ builder.flowFiles(sumFlowFiles.getAsLong());
+ }
+ if (sumBytes.isPresent()) {
+ builder.bytes(sumBytes.getAsLong());
+ }
+ if (sumRecords.isPresent()) {
+ builder.records(sumRecords.getAsLong());
+ }
+ earliestCaughtUp.ifPresent(builder::lastCaughtUp);
+ return builder.build();
+ }
+
+ private static OptionalLong sumOptional(final OptionalLong left, final
OptionalLong right) {
+ if (left.isPresent() && right.isPresent()) {
+ return OptionalLong.of(Math.addExact(left.getAsLong(),
right.getAsLong()));
+ }
+ if (left.isPresent()) {
+ return left;
+ }
+ if (right.isPresent()) {
+ return right;
+ }
+ return OptionalLong.empty();
+ }
+
+ private static Optional<Instant> earlierOf(final Optional<Instant> left,
final Optional<Instant> right) {
+ if (left.isPresent() && right.isPresent()) {
+ return left.get().isBefore(right.get()) ? left : right;
+ }
+ if (left.isPresent()) {
+ return left;
+ }
+ return right;
+ }
+
+ /**
+ * Creates a Backlog whose only populated dimension is the FlowFile count,
with {@link Precision#EXACT}.
+ *
+ * @param count the number of FlowFiles remaining on the source; must be
non-negative
+ * @return a new Backlog
+ */
+ public static Backlog flowFiles(final long count) {
+ return new Builder().flowFiles(count).build();
+ }
+
+ /**
+ * Creates a Backlog whose only populated dimension is the byte count,
with {@link Precision#EXACT}.
+ *
+ * @param bytes the number of bytes remaining on the source; must be
non-negative
+ * @return a new Backlog
+ */
+ public static Backlog bytes(final long bytes) {
+ return new Builder().bytes(bytes).build();
+ }
+
+ /**
+ * Creates a Backlog whose only populated dimension is the record count,
with {@link Precision#EXACT}.
+ *
+ * @param count the number of records remaining on the source; must be
non-negative
+ * @return a new Backlog
+ */
+ public static Backlog records(final long count) {
+ return new Builder().records(count).build();
+ }
+
+ /**
+ * Creates a Backlog whose only populated dimension is the {@code
lastCaughtUp} timestamp.
+ * Useful for combining with a count-only Backlog via {@link
#plus(Backlog)}.
+ *
+ * @param instant the moment at which the component was last observed as
fully caught up
+ * @return a new Backlog
+ */
+ public static Backlog lastCaughtUp(final Instant instant) {
Review Comment:
Since lastCaughtUp(Instant) is documented as useful for combining with a
count-only Backlog, but plus() downgrades any count-only combine with it to
AT_LEAST, is that downgrade intended when the timestamp adds no numeric
uncertainty?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]