[
https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=171132&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-171132
]
ASF GitHub Bot logged work on BEAM-2939:
----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Nov/18 17:40
Start Date: 30/Nov/18 17:40
Worklog Time Spent: 10m
Work Description: lukecwik closed pull request #6969: [BEAM-2939]
SplittableDoFn Java SDK API Changes
URL: https://github.com/apache/beam/pull/6969
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto
b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 39229b1fb528..4328fa393539 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -186,6 +186,15 @@ message ProcessBundleDescriptor {
org.apache.beam.model.pipeline.v1.ApiServiceDescriptor
state_api_service_descriptor = 7;
}
+
+// Represents a non-negative decimal number: unscaled_value * 10^(scale)
(scientific notation)
+message Decimal {
+ // Represents the unscaled value as a big endian unlimited precision
non-negative integer.
+ bytes unscaled_value = 1;
+ // Represents the scale
+ int32 scale = 2;
+}
+
// One of the applications specifying the scope of work for a bundle.
// See
https://docs.google.com/document/d/1tUDb45sStdR8u7-jBkGdw3OGFK7aa2-V7eo86zYSE_4/edit#heading=h.9g3g5weg2u9
for further details.
message BundleApplication {
@@ -219,10 +228,9 @@ message BundleApplication {
bytes partition = 1;
// The estimate for the backlog.
- oneof value {
- // Represents an estimate for the amount of outstanding work. Values
- // compare lexicographically.
- bytes bytes = 1000;
+ oneof backlog {
+ // Represents an estimate for the amount of outstanding work.
+ Decimal value = 1000;
// Whether the backlog is unknown.
bool is_unknown = 1001;
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index a3cbd703df06..b5822c125010 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -54,6 +54,7 @@
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -480,6 +481,7 @@ public void processElement(final ProcessContext c) {
invoker.invokeSplitRestriction(
element,
c.element().getValue(),
+ Backlog.unknown(),
new OutputReceiver<RestrictionT>() {
@Override
public void output(RestrictionT part) {
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
index 261e86f23ccb..1b61575573a5 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
@@ -146,8 +146,8 @@ public void process(ProcessContext c, BoundedWindow w) {
invoker.invokeProcessElement(new NestedProcessContext<>(fn, c,
element, w, tracker));
if (continuation.shouldResume()) {
restriction = tracker.checkpoint();
- Uninterruptibles.sleepUninterruptibly(
- continuation.resumeDelay().getMillis(), TimeUnit.MILLISECONDS);
+ long sleepTimeMillis = continuation.resumeTime().getMillis() -
System.currentTimeMillis();
+ Uninterruptibles.sleepUninterruptibly(sleepTimeMillis,
TimeUnit.MILLISECONDS);
} else {
break;
}
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index a727ce910fb9..84b46691d3b0 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -385,8 +385,13 @@ public void processElement(final ProcessContext c) {
if (futureOutputWatermark == null) {
futureOutputWatermark = elementAndRestriction.getKey().getTimestamp();
}
+
+ // TODO(BEAM-6157): When we are getting a ProcessContinuation#STOP, why
are we setting another
+ // timer?
Instant wakeupTime =
-
timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
+
timerInternals.currentProcessingTime().isBefore(result.getContinuation().resumeTime())
+ ? result.getContinuation().resumeTime()
+ : timerInternals.currentProcessingTime();
holdState.add(futureOutputWatermark);
// Set a timer to continue processing this element.
timerInternals.setTimer(
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
index 07453e068a71..7f257e3a2f63 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
@@ -45,6 +45,7 @@
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.testing.ResetDateTimeProvider;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
@@ -75,6 +76,7 @@
public class SplittableParDoProcessFnTest {
private static final int MAX_OUTPUTS_PER_BUNDLE = 10000;
private static final Duration MAX_BUNDLE_DURATION =
Duration.standardSeconds(5);
+ @Rule public final ResetDateTimeProvider dateTimeProvider = new
ResetDateTimeProvider();
// ----------------- Tests for whether the transform sets boundedness
correctly --------------
private static class SomeRestriction
@@ -338,6 +340,7 @@ public OffsetRangeTracker newTracker(OffsetRange range) {
public void testUpdatesWatermark() throws Exception {
DoFn<Instant, String> fn = new WatermarkUpdateFn();
Instant base = Instant.now();
+ dateTimeProvider.setDateTimeFixed(base.getMillis());
ProcessFnTester<Instant, String, OffsetRange, Long> tester =
new ProcessFnTester<>(
@@ -352,10 +355,12 @@ public void testUpdatesWatermark() throws Exception {
assertThat(tester.takeOutputElements(), hasItems("0", "1", "2"));
assertEquals(base.plus(Duration.standardSeconds(2)),
tester.getWatermarkHold());
+ dateTimeProvider.setDateTimeFixed(base.getMillis() +
Duration.standardSeconds(1).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
assertThat(tester.takeOutputElements(), hasItems("3", "4", "5"));
assertEquals(base.plus(Duration.standardSeconds(5)),
tester.getWatermarkHold());
+ dateTimeProvider.setDateTimeFixed(base.getMillis() +
Duration.standardSeconds(2).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
assertThat(tester.takeOutputElements(), hasItems("6", "7"));
assertEquals(null, tester.getWatermarkHold());
@@ -381,6 +386,7 @@ public SomeRestriction getInitialRestriction(Integer elem) {
public void testResumeSetsTimer() throws Exception {
DoFn<Integer, String> fn = new SelfInitiatedResumeFn();
Instant base = Instant.now();
+ dateTimeProvider.setDateTimeFixed(base.getMillis());
ProcessFnTester<Integer, String, SomeRestriction, Void> tester =
new ProcessFnTester<>(
base,
@@ -394,18 +400,22 @@ public void testResumeSetsTimer() throws Exception {
assertThat(tester.takeOutputElements(), contains("42"));
// Should resume after 5 seconds: advancing by 3 seconds should have no
effect.
+ dateTimeProvider.setDateTimeFixed(base.getMillis() +
Duration.standardSeconds(3).getMillis());
assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
assertTrue(tester.takeOutputElements().isEmpty());
// 6 seconds should be enough should invoke the fn again.
+ dateTimeProvider.setDateTimeFixed(base.getMillis() +
Duration.standardSeconds(6).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
assertThat(tester.takeOutputElements(), contains("42"));
// Should again resume after 5 seconds: advancing by 3 seconds should
again have no effect.
+ dateTimeProvider.setDateTimeFixed(base.getMillis() +
Duration.standardSeconds(9).getMillis());
assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
assertTrue(tester.takeOutputElements().isEmpty());
// 6 seconds should again be enough.
+ dateTimeProvider.setDateTimeFixed(base.getMillis() +
Duration.standardSeconds(12).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
assertThat(tester.takeOutputElements(), contains("42"));
}
@@ -442,6 +452,7 @@ public OffsetRange getInitialRestriction(Integer elem) {
public void testResumeCarriesOverState() throws Exception {
DoFn<Integer, String> fn = new CounterFn(1);
Instant base = Instant.now();
+ dateTimeProvider.setDateTimeFixed(base.getMillis());
ProcessFnTester<Integer, String, OffsetRange, Long> tester =
new ProcessFnTester<>(
base,
@@ -453,14 +464,18 @@ public void testResumeCarriesOverState() throws Exception
{
tester.startElement(42, new OffsetRange(0, 3));
assertThat(tester.takeOutputElements(), contains("42"));
+ dateTimeProvider.setDateTimeFixed(base.getMillis() +
Duration.standardSeconds(1).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
assertThat(tester.takeOutputElements(), contains("43"));
+ dateTimeProvider.setDateTimeFixed(base.getMillis() +
Duration.standardSeconds(2).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
assertThat(tester.takeOutputElements(), contains("44"));
+ dateTimeProvider.setDateTimeFixed(base.getMillis() +
Duration.standardSeconds(3).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
// After outputting all 3 items, should not output anything more.
assertEquals(0, tester.takeOutputElements().size());
// Should also not ask to resume.
+ dateTimeProvider.setDateTimeFixed(base.getMillis() +
Duration.standardSeconds(4).getMillis());
assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
}
@@ -469,6 +484,7 @@ public void testCheckpointsAfterNumOutputs() throws
Exception {
int max = 100;
DoFn<Integer, String> fn = new CounterFn(Integer.MAX_VALUE);
Instant base = Instant.now();
+ dateTimeProvider.setDateTimeFixed(base.getMillis());
int baseIndex = 42;
ProcessFnTester<Integer, String, OffsetRange, Long> tester =
@@ -490,6 +506,7 @@ public void testCheckpointsAfterNumOutputs() throws
Exception {
assertThat(elements, hasItem(String.valueOf(baseIndex)));
assertThat(elements, hasItem(String.valueOf(baseIndex + max - 1)));
+ dateTimeProvider.setDateTimeFixed(base.getMillis() +
Duration.standardSeconds(1).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
elements = tester.takeOutputElements();
assertEquals(max, elements.size());
@@ -497,6 +514,7 @@ public void testCheckpointsAfterNumOutputs() throws
Exception {
assertThat(elements, hasItem(String.valueOf(baseIndex + max)));
assertThat(elements, hasItem(String.valueOf(baseIndex + 2 * max - 1)));
+ dateTimeProvider.setDateTimeFixed(base.getMillis() +
Duration.standardSeconds(2).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
elements = tester.takeOutputElements();
assertEquals(max / 2, elements.size());
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
index fe6dd9ceeef8..a73581dc5abf 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
@@ -47,6 +47,7 @@
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -169,7 +170,7 @@ public OffsetRange getInitialRange(String element) {
@SplitRestriction
public void splitRange(
- String element, OffsetRange range, OutputReceiver<OffsetRange>
receiver) {
+ String element, OffsetRange range, Backlog backlog,
OutputReceiver<OffsetRange> receiver) {
long middle = (range.getFrom() + range.getTo()) / 2;
receiver.output(new OffsetRange(range.getFrom(), middle));
receiver.output(new OffsetRange(middle, range.getTo()));
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index b737a9ffef8a..73aa4297ec76 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -38,6 +38,7 @@
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -744,13 +745,29 @@ public Duration getAllowedTimestampSkew() {
* href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}
into multiple parts to
* be processed in parallel.
*
- * <p>Signature: {@code List<RestrictionT> splitRestriction( InputT element,
RestrictionT
- * restriction);}
+ * <p>The signature of this method must satisfy the following constraints:
+ *
+ * <ul>
+ * <li>One of its parameters must be the {@code InputT} element.
+ * <li>One of its parameters must be the restriction.
+ * <li>One of its input parameters must be of type {@link Backlog}.
Splitting the restriction
+ * should attempt to take the backlog information into account. If the
backlog is known,
+ * each split should return a restriction with an approximate amount
of work bounded by the
+ * backlog. In the case of an unbounded restriction, at most one of
the splits can represent
+ * the unbounded portion of work. If the backlog that is specified is
unknown, it is up to
+ * the SDK to choose a number of splits of approximately equally sized
portions with
+ * potentially one of those splits representing the unbounded portion
of work.
+ * <li>One of its parameters must be the output receiver for restrictions.
+ * </ul>
+ *
+ * <p>Signature: {@code splitRestriction(InputT element, RestrictionT
restriction, Backlog
+ * backlog, OutputReceiver<RestrictionT> receiver);}
*
* <p>Optional: if this method is omitted, the restriction will not be split
(equivalent to
- * defining the method and returning {@code
Collections.singletonList(restriction)}).
+ * defining the method and outputting {@code
Collections.singletonList(restriction)}).
*/
// TODO: Make the InputT parameter optional.
+ // TODO: Make the Backlog parameter optional.
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@@ -797,7 +814,7 @@ public Duration getAllowedTimestampSkew() {
// This can't be put into ProcessContinuation itself due to the following
problem:
//
http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html
private static final ProcessContinuation PROCESS_CONTINUATION_STOP =
- new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO);
+ new AutoValue_DoFn_ProcessContinuation(false, new Instant(0L));
/**
* When used as a return value of {@link ProcessElement}, indicates whether
there is more work to
@@ -816,7 +833,7 @@ public static ProcessContinuation stop() {
/** Indicates that there is more work to be done for the current element.
*/
public static ProcessContinuation resume() {
- return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO);
+ return new AutoValue_DoFn_ProcessContinuation(true, Instant.now());
}
/**
@@ -826,14 +843,26 @@ public static ProcessContinuation resume() {
public abstract boolean shouldResume();
/**
- * A minimum duration that should elapse between the end of this {@link
ProcessElement} call and
- * the {@link ProcessElement} call continuing processing of the same
element. By default, zero.
+ * A hint that is provided to runners about when execution of this element
and restriction pair
+ * should be scheduled. Runners will attempt to treat this as a lower
bound but may choose not
+ * to do so. By default, the execution should be scheduled immediately.
*/
- public abstract Duration resumeDelay();
+ public abstract Instant resumeTime();
- /** Builder method to set the value of {@link #resumeDelay()}. */
+ /**
+ * Returns a new {@link ProcessContinuation} like this one but with the
{@link #resumeTime()}
+ * set to {@code now() + resumeDelay}.
+ */
public ProcessContinuation withResumeDelay(Duration resumeDelay) {
- return new AutoValue_DoFn_ProcessContinuation(shouldResume(),
resumeDelay);
+ return this.withResumeTime(Instant.now().plus(resumeDelay));
+ }
+
+ /**
+ * Returns a new {@link ProcessContinuation} like this one but with the
{@link #resumeTime()}
+ * set to the specified value.
+ */
+ public ProcessContinuation withResumeTime(Instant resumeTime) {
+ return new AutoValue_DoFn_ProcessContinuation(shouldResume(),
resumeTime);
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 11fee7874d83..a68b2489a0c3 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -84,6 +84,7 @@
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimestampParameter;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
+import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.util.UserCodeException;
@@ -257,7 +258,10 @@ public void invokeOnTimer(
/** Doesn't split the restriction. */
@SuppressWarnings("unused")
public static <InputT, RestrictionT> void invokeSplitRestriction(
- InputT element, RestrictionT restriction,
DoFn.OutputReceiver<RestrictionT> receiver) {
+ InputT element,
+ RestrictionT restriction,
+ Backlog backlog,
+ DoFn.OutputReceiver<RestrictionT> receiver) {
receiver.output(restriction);
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 21d1653fa117..45cf9f42c906 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -32,6 +32,7 @@
import org.apache.beam.sdk.transforms.DoFn.StartBundle;
import org.apache.beam.sdk.transforms.DoFn.StateId;
import org.apache.beam.sdk.transforms.DoFn.TimerId;
+import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -86,6 +87,7 @@
<RestrictionT> void invokeSplitRestriction(
InputT element,
RestrictionT restriction,
+ Backlog backlog,
DoFn.OutputReceiver<RestrictionT> restrictionReceiver);
/** Invoke the {@link DoFn.NewTracker} method on the bound {@link DoFn}. */
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 4c9b61b98d56..4f2b18c3b679 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -64,6 +64,7 @@
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
+import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -1182,18 +1183,24 @@ private static boolean
hasTimestampAnnotation(List<Annotation> annotations) {
errors.checkArgument(void.class.equals(m.getReturnType()), "Must return
void");
Type[] params = m.getGenericParameterTypes();
- errors.checkArgument(params.length == 3, "Must have exactly 3 arguments");
+ errors.checkArgument(params.length == 4, "Must have 4 arguments");
errors.checkArgument(
fnT.resolveType(params[0]).equals(inputT),
"First argument must be the element type %s",
formatType(inputT));
TypeDescriptor<?> restrictionT = fnT.resolveType(params[1]);
- TypeDescriptor<?> receiverT = fnT.resolveType(params[2]);
+ TypeDescriptor<?> backlogType = fnT.resolveType(params[2]);
+ errors.checkArgument(
+ backlogType.equals(TypeDescriptor.of(Backlog.class)),
+ "Third argument must be %s, but is %s",
+ formatType(TypeDescriptor.of(Backlog.class)),
+ formatType(backlogType));
+ TypeDescriptor<?> receiverT = fnT.resolveType(params[3]);
TypeDescriptor<?> expectedReceiverT = outputReceiverTypeOf(restrictionT);
errors.checkArgument(
receiverT.equals(expectedReceiverT),
- "Third argument must be %s, but is %s",
+ "Fourth argument must be %s, but is %s",
formatType(expectedReceiverT),
formatType(receiverT));
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Backlog.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Backlog.java
new file mode 100644
index 000000000000..2712f9b2ff86
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Backlog.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import com.google.auto.value.AutoValue;
+import java.math.BigDecimal;
+import javax.annotation.Nullable;
+
+/**
+ * A representation for the amount of known work represented as a backlog.
Note that backlog {@code
+ * byte[]} representations must be lexicographically comparable. Backlog
{@code byte[]}
+ * representations should preferably represent a linear space and be
comparable within the same
+ * partition.
+ *
+ * <p>It is up to each restriction tracker to convert between their natural
representation of
+ * outstanding work and this representation. For example:
+ *
+ * <ul>
+ * <li>Block based file source (e.g. Avro): From the end of the current
block, the remaining
+ * number of bytes to the end of the restriction represented as a big
endian 64 bit unsigned
+ * integer.
+ * <li>Pull based queue based source (e.g. Pubsub): The local/global backlog
available in number
+ * of messages or number of {@code messages / bytes} that have not been
processed represented
+ * as a big endian 64 bit unsigned integer.
+ * <li>Key range based source (e.g. Shuffle, Bigtable, ...): Scale the start
key to be one and end
+ * key to be zero and interpolate the position of the next splittable
key as the backlog. If
+ * information about the probability density function or cumulative
distribution function is
+ * available, backlog interpolation can be improved. Alternatively, if
the number of encoded
+ * bytes for the keys and values is known for the key range, the backlog
of remaining bytes
+ * can be used.
+ * </ul>
+ *
+ * <p>{@link RestrictionTracker}s should implement {@link Backlogs.HasBacklog}
to report a backlog
+ * where the element and restriction pair uniquely identify the resource.
Otherwise {@link
+ * RestrictionTracker}s should implement {@link
Backlogs.HasPartitionedBacklog} to report a backlog
+ * for a shared resource such as a message queue.
+ *
+ * <p>See <a
href="https://s.apache.org/beam-bundles-backlog-splitting">Bundles w/
SplittableDoFns:
+ * Backlog & Splitting</a> for further details.
+ */
+@AutoValue
+public abstract class Backlog {
+ private static final Backlog UNKNOWN_BACKLOG = new AutoValue_Backlog(null);
+
+ /** Returns a backlog represented by the specified bytes. */
+ public static Backlog of(BigDecimal backlog) {
+ return new AutoValue_Backlog(backlog);
+ }
+
+ /** Returns an unknown backlog. */
+ public static Backlog unknown() {
+ return UNKNOWN_BACKLOG;
+ }
+
+ /** Returns whether this backlog is known or unknown. */
+ public boolean isUnknown() {
+ return backlogInternal() == null;
+ }
+
+ /**
+ * Returns the {@code byte[]} representation of the backlog if it is known.
+ *
+ * @throws IllegalStateException if the backlog is unknown.
+ */
+ public BigDecimal backlog() {
+ if (isUnknown()) {
+ throw new IllegalStateException("Backlog is unknown, there is no byte[]
representation.");
+ }
+ return backlogInternal();
+ }
+
+ @SuppressWarnings("mutable")
+ @Nullable
+ protected abstract BigDecimal backlogInternal();
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Backlogs.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Backlogs.java
new file mode 100644
index 000000000000..2bd21e6697c3
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Backlogs.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+/** Definitions and convenience methods for reporting/consuming/updating
backlogs. */
+public final class Backlogs {
+ /**
+ * {@link RestrictionTracker}s which can provide a backlog should implement
this interface.
+ * Implementations that do not implement this interface will be assumed to
have an unknown
+ * backlog.
+ *
+ * <p>By default, the backlog partition identifier is represented as the
encoded element and
+ * restriction pair. See {@link HasPartitionedBacklog} for {@link
RestrictionTracker}s that report
+ * backlogs over a shared resource.
+ */
+ public interface HasBacklog {
+ Backlog getBacklog();
+ }
+
+ /**
+ * {@link RestrictionTracker}s which can provide a backlog that is from a
shared resource such as
+ * a message queue should implement this interface to provide the partition
identifier. The
+ * partition identifier is used by runners for various backlog calculations.
Backlogs reported
+ * with the same partition identifier represent a point in time reporting of
the backlog for that
+ * partition. For example, a runner can compute a global backlog by summing
all reported backlogs
+ * over all unique partition identifiers.
+ *
+ * <p>For example SplittableDoFn's which consume elements from:
+ *
+ * <ul>
+ * <li>a globally shared resource such as a Pubsub queue should set this
to "".
+ * <li>a shared partitioned resource should use the partition identifier.
+ * <li>a uniquely partitioned resource such as a file range should set
this to file name + start
+ * offset. Note that the default for {@link RestrictionTracker}s is to
use the encoded
+ * element and restriction pair.
+ * </ul>
+ *
+ * <p>Returns an immutable representation of the partition identifier.
+ */
+ public interface HasPartitionedBacklog extends HasBacklog {
+ byte[] getBacklogPartition();
+ }
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
index d596862b5acd..55e150fd0f7f 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -22,6 +22,13 @@
/**
* Manages access to the restriction and keeps track of its claimed part for a
<a
* href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
+ *
+ * <p>Restriction trackers which can provide an estimate for the known amount
of outstanding work
+ * should implement {@link Backlogs.HasBacklog} to provide information that
can be used during
+ * progress reporting and splitting by runners to improve the performance of
the pipeline and
+ * increase resource utilization. See <a
+ * href="https://s.apache.org/beam-bundles-backlog-splitting">Bundles w/
SplittableDoFns: Backlog
+ * & Splitting</a> for further details.
*/
public abstract class RestrictionTracker<RestrictionT, PositionT> {
/**
@@ -67,6 +74,4 @@
* work remaining in the restriction.
*/
public abstract void checkDone() throws IllegalStateException;
-
- // TODO: Add the more general splitRemainderAfterFraction() and other
methods.
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Restrictions.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Restrictions.java
new file mode 100644
index 000000000000..fd10f57459fe
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Restrictions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+/** Definitions and convenience methods for working with restrictions. */
+public final class Restrictions {
+
+ /**
+ * By default all restrictions are assumed to be unbounded and it is
expected that SplittableDoFn
+ * authors mark their restriction type with this interface if the
restriction produces a bounded
+ * amount of output.
+ */
+ interface IsBounded {}
+}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index a347012ecc2b..08a26cb90c45 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -46,6 +46,7 @@
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -97,7 +98,7 @@ public OffsetRange getInitialRange(String element) {
@SplitRestriction
public void splitRange(
- String element, OffsetRange range, OutputReceiver<OffsetRange>
receiver) {
+ String element, OffsetRange range, Backlog backlog,
OutputReceiver<OffsetRange> receiver) {
receiver.output(new OffsetRange(range.getFrom(), (range.getFrom() +
range.getTo()) / 2));
receiver.output(new OffsetRange((range.getFrom() + range.getTo()) / 2,
range.getTo()));
}
@@ -693,7 +694,7 @@ public OffsetRange getInitialRestriction(String value) {
@SplitRestriction
public void splitRestriction(
- String value, OffsetRange range, OutputReceiver<OffsetRange> receiver)
{
+ String value, OffsetRange range, Backlog backlog,
OutputReceiver<OffsetRange> receiver) {
assertEquals(State.OUTSIDE_BUNDLE, state);
receiver.output(range);
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 0d2be5a2d536..5e6a040b37a3 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -48,11 +48,13 @@
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.testing.ResetDateTimeProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
import
org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
+import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -75,6 +77,7 @@
@RunWith(JUnit4.class)
public class DoFnInvokersTest {
@Rule public ExpectedException thrown = ExpectedException.none();
+ @Rule public ResetDateTimeProvider dateTimeProvider = new
ResetDateTimeProvider();
@Mock private DoFn<String, String>.StartBundleContext mockStartBundleContext;
@Mock private DoFn<String, String>.FinishBundleContext
mockFinishBundleContext;
@@ -315,6 +318,9 @@ public void onWindowExpiration(BoundedWindow window) {}
@Test
public void testDoFnWithReturn() throws Exception {
+ // We have to set the date time since computing "resume()" is dependent on
system time.
+ dateTimeProvider.setDateTimeFixed(123456789);
+
class MockFn extends DoFn<String, String> {
@DoFn.ProcessElement
public ProcessContinuation processElement(
@@ -406,7 +412,10 @@ public SomeRestriction getInitialRestriction(String
element) {
@SplitRestriction
public void splitRestriction(
- String element, SomeRestriction restriction,
OutputReceiver<SomeRestriction> receiver) {}
+ String element,
+ SomeRestriction restriction,
+ Backlog backlog,
+ OutputReceiver<SomeRestriction> receiver) {}
@NewTracker
public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
@@ -421,6 +430,9 @@ public SomeRestrictionCoder getRestrictionCoder() {
@Test
public void testSplittableDoFnWithAllMethods() throws Exception {
+ // We have to set the date time since computing "resume()" is dependent on
system time.
+ dateTimeProvider.setDateTimeFixed(100000L);
+
MockFn fn = mock(MockFn.class);
DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
final SomeRestrictionTracker tracker = mock(SomeRestrictionTracker.class);
@@ -439,6 +451,7 @@ public void testSplittableDoFnWithAllMethods() throws
Exception {
public void splitRestriction(
String element,
SomeRestriction restriction,
+ Backlog backlog,
DoFn.OutputReceiver<SomeRestriction> receiver) {
receiver.output(part1);
receiver.output(part2);
@@ -446,7 +459,7 @@ public void splitRestriction(
}
}))
.when(fn)
- .splitRestriction(eq("blah"), same(restriction), Mockito.any());
+ .splitRestriction(eq("blah"), same(restriction),
eq(Backlog.unknown()), Mockito.any());
when(fn.newTracker(restriction)).thenReturn(tracker);
when(fn.processElement(mockProcessContext, tracker)).thenReturn(resume());
@@ -456,6 +469,7 @@ public void splitRestriction(
invoker.invokeSplitRestriction(
"blah",
restriction,
+ Backlog.unknown(),
new OutputReceiver<SomeRestriction>() {
@Override
public void output(SomeRestriction output) {
@@ -469,6 +483,7 @@ public void outputWithTimestamp(SomeRestriction output,
Instant timestamp) {
});
assertEquals(Arrays.asList(part1, part2, part3), outputs);
assertEquals(tracker, invoker.invokeNewTracker(restriction));
+
assertEquals(
resume(),
invoker.invokeProcessElement(
@@ -554,6 +569,7 @@ public RestrictionWithDefaultTracker
getInitialRestriction(String element) {
invoker.invokeSplitRestriction(
"blah",
"foo",
+ Backlog.unknown(),
new DoFn.OutputReceiver<String>() {
private boolean invoked;
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index 317b1453f5ce..0115494139b4 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -31,6 +31,7 @@
import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.AnonymousMethod;
import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -243,7 +244,10 @@ public SomeRestriction getInitialRestriction(Integer
element) {
@SplitRestriction
public void splitRestriction(
- Integer element, SomeRestriction restriction,
OutputReceiver<SomeRestriction> receiver) {}
+ Integer element,
+ SomeRestriction restriction,
+ Backlog backlog,
+ OutputReceiver<SomeRestriction> receiver) {}
@NewTracker
public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
@@ -287,7 +291,10 @@ public RestrictionT getInitialRestriction(Integer element)
{
@SplitRestriction
public void splitRestriction(
- Integer element, RestrictionT restriction,
OutputReceiver<RestrictionT> receiver) {}
+ Integer element,
+ RestrictionT restriction,
+ Backlog backlog,
+ OutputReceiver<RestrictionT> receiver) {}
@NewTracker
public TrackerT newTracker(RestrictionT restriction) {
@@ -447,14 +454,39 @@ public KvCoder getRestrictionCoder() {
@Test
public void testSplitRestrictionReturnsWrongType() throws Exception {
thrown.expectMessage(
- "Third argument must be DoFn.OutputReceiver<SomeRestriction>, "
+ "Fourth argument must be DoFn.OutputReceiver<SomeRestriction>, "
+ "but is DoFn.OutputReceiver<String>");
DoFnSignatures.analyzeSplitRestrictionMethod(
errors(),
TypeDescriptor.of(FakeDoFn.class),
new AnonymousMethod() {
void method(
- Integer element, SomeRestriction restriction,
DoFn.OutputReceiver<String> receiver) {}
+ Integer element,
+ SomeRestriction restriction,
+ Backlog backlog,
+ DoFn.OutputReceiver<String> receiver) {}
+ }.getMethod(),
+ TypeDescriptor.of(Integer.class));
+ }
+
+ @Test
+ public void testSplitRestrictionWrongBacklogArgument() throws Exception {
+ class BadFn {
+ private List<SomeRestriction> splitRestriction(String element,
SomeRestriction restriction) {
+ return null;
+ }
+ }
+
+ thrown.expectMessage("Third argument must be Backlog, but is String");
+ DoFnSignatures.analyzeSplitRestrictionMethod(
+ errors(),
+ TypeDescriptor.of(FakeDoFn.class),
+ new AnonymousMethod() {
+ void method(
+ Integer element,
+ SomeRestriction restriction,
+ String notBacklog,
+ DoFn.OutputReceiver<SomeRestriction> receiver) {}
}.getMethod(),
TypeDescriptor.of(Integer.class));
}
@@ -475,6 +507,7 @@ public void testSplitRestrictionWrongElementArgument()
throws Exception {
void method(
String element,
SomeRestriction restriction,
+ Backlog backlog,
DoFn.OutputReceiver<SomeRestriction> receiver) {}
}.getMethod(),
TypeDescriptor.of(Integer.class));
@@ -482,7 +515,7 @@ void method(
@Test
public void testSplitRestrictionWrongNumArguments() throws Exception {
- thrown.expectMessage("Must have exactly 3 arguments");
+ thrown.expectMessage("Must have 4 arguments");
DoFnSignatures.analyzeSplitRestrictionMethod(
errors(),
TypeDescriptor.of(FakeDoFn.class),
@@ -490,6 +523,7 @@ public void testSplitRestrictionWrongNumArguments() throws
Exception {
private void method(
Integer element,
SomeRestriction restriction,
+ Backlog backlog,
DoFn.OutputReceiver<SomeRestriction> receiver,
Object extra) {}
}.getMethod(),
@@ -519,6 +553,7 @@ public SomeRestriction getInitialRestriction(Integer
element) {
public void splitRestriction(
Integer element,
OtherRestriction restriction,
+ Backlog backlog,
OutputReceiver<OtherRestriction> receiver) {}
}
@@ -526,7 +561,7 @@ public void splitRestriction(
"getInitialRestriction(Integer): Uses restriction type
SomeRestriction, "
+ "but @SplitRestriction method ");
thrown.expectMessage(
- "splitRestriction(Integer, OtherRestriction, OutputReceiver) "
+ "splitRestriction(Integer, OtherRestriction, Backlog, OutputReceiver) "
+ "uses restriction type OtherRestriction");
DoFnSignatures.getSignature(BadFn.class);
}
@@ -544,7 +579,10 @@ public SomeRestriction getInitialRestriction(Integer
element) {
@SplitRestriction
public void splitRestriction(
- Integer element, SomeRestriction restriction,
OutputReceiver<SomeRestriction> receiver) {}
+ Integer element,
+ SomeRestriction restriction,
+ Backlog backlog,
+ OutputReceiver<SomeRestriction> receiver) {}
@NewTracker
public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
index 1cfc852f466b..641a7e449c5e 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
@@ -245,9 +245,7 @@ public void outputWindowedValue(
DelayedBundleApplication.newBuilder()
.setApplication(residualApplication)
.setRequestedExecutionTime(
- Timestamps.fromMillis(
- System.currentTimeMillis()
- +
result.getContinuation().resumeDelay().getMillis()))
+
Timestamps.fromMillis(result.getContinuation().resumeTime().getMillis()))
.build()));
}
}
diff --git
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
index dc99b769ee47..801a19028acd 100644
---
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
+++
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
@@ -24,6 +24,7 @@
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
+import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.ByteKeyRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -85,7 +86,7 @@ public ByteKeyRange getInitialRestriction(HBaseQuery query) {
@SplitRestriction
public void splitRestriction(
- HBaseQuery query, ByteKeyRange range, OutputReceiver<ByteKeyRange>
receiver)
+ HBaseQuery query, ByteKeyRange range, Backlog backlog,
OutputReceiver<ByteKeyRange> receiver)
throws Exception {
List<HRegionLocation> regionLocations =
HBaseUtils.getRegionLocations(connection, query.getTableId(),
query.getScan());
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 171132)
Time Spent: 11h 50m (was: 11h 40m)
> Fn API streaming SDF support
> ----------------------------
>
> Key: BEAM-2939
> URL: https://issues.apache.org/jira/browse/BEAM-2939
> Project: Beam
> Issue Type: Improvement
> Components: beam-model
> Reporter: Henning Rohde
> Assignee: Luke Cwik
> Priority: Major
> Labels: portability
> Time Spent: 11h 50m
> Remaining Estimate: 0h
>
> The Fn API should support streaming SDF. Detailed design TBD.
> Once design is ready, expand subtasks similarly to BEAM-2822.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)