[
https://issues.apache.org/jira/browse/BEAM-9977?focusedWorklogId=435694&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435694
]
ASF GitHub Bot logged work on BEAM-9977:
----------------------------------------
Author: ASF GitHub Bot
Created on: 20/May/20 21:05
Start Date: 20/May/20 21:05
Worklog Time Spent: 10m
Work Description: lukecwik commented on a change in pull request #11715:
URL: https://github.com/apache/beam/pull/11715#discussion_r428304723
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+/**
+ * An {@link OffsetRangeTracker} for tracking a growable offset range. {@code
Long.MAX_VALUE} is
+ * used as the end of the range to indicate infinity.
+ *
+ * <p>An offset range is considered growable when the end offset could grow
(or change) during
+ * execution time (e.g., Kafka topic partition offset, appended file, ...).
+ *
+ * <p>The growable range is marked as done by claiming {@code Long.MAX_VALUE}.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public class GrowableOffsetRangeTracker extends OffsetRangeTracker {
+ /**
+ * Provides the estimated end offset of the range.
+ *
+ * <p>{@link #estimate} is called to give the end offset when {@link
#trySplit} or {@link
+ * #getProgress} is invoked. The end offset is exclusive for the range. The
estimated end is not
+ * required to monotonically increase as it will only be taken into
consideration when the
+ * estimated end offset is larger than the current position. Returning
{@code Long.MAX_VALUE} as
+ * the estimate implies the largest possible position for the range is
{@code Long.MAX_VALUE - 1}.
+ * Return {@code Long.MIN_VALUE} if an estimate can not be provided.
+ *
+ * <p>Providing a good estimate is important for an accurate progress signal
and will impact
+ * splitting decisions by the runner.
+ *
+ * <p>If {@link #estimate} is expensive to compute, consider wrapping the
implementation with
+ * {@link Suppliers#memoizeWithExpiration} or equivalent as an optimization.
+ *
+ * <p>TODO(BEAM-10032): Also consider using {@link RangeEndEstimator} when
the range is not ended
+ * with {@code Long.MAX_VALUE}.
+ */
+ @FunctionalInterface
+ public interface RangeEndEstimator {
+ long estimate();
+ }
+
+ private final RangeEndEstimator rangeEndEstimator;
+
+ public GrowableOffsetRangeTracker(long start, RangeEndEstimator
rangeEndEstimator) {
+ super(new OffsetRange(start, Long.MAX_VALUE));
+ this.rangeEndEstimator = checkNotNull(rangeEndEstimator);
+ }
+
+ @Override
+ public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
+ // If current tracking range is no longer growable, split it as a normal
range.
+ if (range.getTo() != Long.MAX_VALUE || range.getTo() == range.getFrom()) {
+ return super.trySplit(fractionOfRemainder);
+ }
+ // If current range has been done, there is no more space to split.
+ if (lastAttemptedOffset != null && lastAttemptedOffset == Long.MAX_VALUE) {
+ return null;
+ }
+ BigDecimal cur =
+ (lastAttemptedOffset == null)
+ ? BigDecimal.valueOf(range.getFrom()).subtract(BigDecimal.ONE,
MathContext.DECIMAL128)
+ : BigDecimal.valueOf(lastAttemptedOffset);
+
+ // Fetch the estimated end offset. If the estimated end is smaller than
the next offset, use
+ // the next offset as end.
+ BigDecimal estimateRangeEnd =
+ BigDecimal.valueOf(rangeEndEstimator.estimate())
+ .max(cur.add(BigDecimal.ONE, MathContext.DECIMAL128));
+
+ // Convert to BigDecimal in computation to prevent overflow, which may
result in loss of
+ // precision.
+ // split = cur + max(1, (estimateRangeEnd - cur) * fractionOfRemainder)
+ BigDecimal splitPos =
+ cur.add(
+ estimateRangeEnd
+ .subtract(cur, MathContext.DECIMAL128)
+ .multiply(BigDecimal.valueOf(fractionOfRemainder),
MathContext.DECIMAL128)
+ .max(BigDecimal.ONE),
+ MathContext.DECIMAL128);
+ long split = splitPos.longValue();
+ if (split > estimateRangeEnd.longValue()) {
+ return null;
+ }
+ OffsetRange res = new OffsetRange(split, range.getTo());
+ this.range = new OffsetRange(range.getFrom(), split);
+ return SplitResult.of(range, res);
+ }
+
+ @Override
+ public Progress getProgress() {
+ // If current tracking range is no longer growable, get progress as a
normal range.
+ if (range.getTo() != Long.MAX_VALUE || range.getTo() == range.getFrom()) {
+ return super.getProgress();
+ }
+
+ // Convert to BigDecimal in computation to prevent overflow, which may
result in lost of
+ // precision.
+ BigDecimal estimateRangeEnd =
BigDecimal.valueOf(rangeEndEstimator.estimate());
+
+ if (lastAttemptedOffset == null) {
+ return Progress.from(
+ 0,
+ estimateRangeEnd
+ .subtract(BigDecimal.valueOf(range.getFrom()),
MathContext.DECIMAL128)
+ .max(BigDecimal.ZERO)
+ .doubleValue());
+ }
+
+ BigDecimal workRemaining =
+ estimateRangeEnd
+ .subtract(BigDecimal.valueOf(lastAttemptedOffset),
MathContext.DECIMAL128)
+ .max(BigDecimal.ZERO);
+ BigDecimal totalWork =
+ estimateRangeEnd
+ .max(BigDecimal.valueOf(lastAttemptedOffset))
+ .subtract(BigDecimal.valueOf(range.getFrom()));
Review comment:
Doesn't this subtract need the DECIMAL128 math context?
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
##########
@@ -120,13 +136,27 @@ public String toString() {
public Progress getProgress() {
// If we have never attempted an offset, we return the length of the
entire range as work
// remaining.
+ // Convert to BigDecimal in computation to prevent overflow, which may
result in loss of
+ // precision.
if (lastAttemptedOffset == null) {
- return Progress.from(0, range.getTo() - range.getFrom());
+ return Progress.from(
+ 0,
+ BigDecimal.valueOf(range.getTo())
+ .subtract(BigDecimal.valueOf(range.getFrom()),
MathContext.DECIMAL128)
+ .doubleValue());
}
// Compute the amount of work remaining from where we are to where we are
attempting to get to
// with a minimum of zero in case we have claimed beyond the end of the
range.
- long workRemaining = Math.max(range.getTo() - lastAttemptedOffset, 0);
- return Progress.from(range.getTo() - range.getFrom() - workRemaining,
workRemaining);
+ BigDecimal workRemaining =
+ BigDecimal.valueOf(range.getTo())
+ .subtract(BigDecimal.valueOf(lastAttemptedOffset),
MathContext.DECIMAL128)
+ .max(BigDecimal.ZERO);
+ BigDecimal wholeWork =
Review comment:
nit: `wholeWork` -> `totalWork`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 435694)
Time Spent: 3h 20m (was: 3h 10m)
> Build Kafka Read on top of Java SplittableDoFn
> ----------------------------------------------
>
> Key: BEAM-9977
> URL: https://issues.apache.org/jira/browse/BEAM-9977
> Project: Beam
> Issue Type: New Feature
> Components: io-java-kafka
> Reporter: Boyuan Zhang
> Assignee: Boyuan Zhang
> Priority: P2
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)