[ 
https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435842&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435842
 ]

ASF GitHub Bot logged work on BEAM-9603:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/May/20 05:03
            Start Date: 21/May/20 05:03
    Worklog Time Spent: 10m 
      Work Description: boyuanzz commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r428438971



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -962,16 +971,25 @@ private Progress getProgress() {
             .build());
   }
 
-  private <K> void processTimer(String timerId, TimeDomain timeDomain, 
Timer<K> timer) {
+  private <K> void processTimer(
+      String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer<K> timer) {
     currentTimer = timer;
     currentTimeDomain = timeDomain;
     onTimerContext = new OnTimerContext<>(timer.getUserKey());
+    String timerId =
+        timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)

Review comment:
       Ack. Thanks!

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) {
     }
   }
 
-  private static class FnApiTimerMap implements TimerMap {
-    FnApiTimerMap() {}
+  private class FnApiTimerMap<K> implements TimerMap {
+    private final String timerFamilyId;
+    private final K userKey;
+    private final TimeDomain timeDomain;
+    private final Instant elementTimestampOrTimerHoldTimestamp;
+    private final Instant elementTimestampOrTimerFireTimestamp;
+    private final BoundedWindow boundedWindow;
+    private final PaneInfo paneInfo;
+
+    FnApiTimerMap(
+        String timerFamilyId,
+        K userKey,
+        BoundedWindow boundedWindow,
+        Instant elementTimestampOrTimerHoldTimestamp,
+        Instant elementTimestampOrTimerFireTimestamp,
+        PaneInfo paneInfo) {
+      this.timerFamilyId = timerFamilyId;
+      this.userKey = userKey;
+      this.elementTimestampOrTimerHoldTimestamp = 
elementTimestampOrTimerHoldTimestamp;
+      this.elementTimestampOrTimerFireTimestamp = 
elementTimestampOrTimerFireTimestamp;
+      this.boundedWindow = boundedWindow;
+      this.paneInfo = paneInfo;
+
+      TimerFamilyDeclaration timerFamilyDeclaration =
+          doFnSignature.timerFamilyDeclarations().get(timerFamilyId);
+      this.timeDomain =
+          DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, 
doFn).getTimeDomain();
+    }
 
     @Override
-    public void set(String timerId, Instant absoluteTime) {}
+    public void set(String dynamicTimerTag, Instant absoluteTime) {

Review comment:
       I thought it would be good time to revisit the dynamic timer API design 
but it's not in the scope of this PR. Let's leave it as now. Thanks!

##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -947,49 +910,213 @@ public void testTimers() throws Exception {
             timerInGlobalWindow("A", new Instant(1600L), new Instant(10012L)),
             timerInGlobalWindow("X", new Instant(1700L), new Instant(10022L)),
             timerInGlobalWindow("C", new Instant(1800L), new Instant(10022L)),
-            timerInGlobalWindow("B", new Instant(1900L), new 
Instant(10022L))));
+            timerInGlobalWindow("B", new Instant(1900L), new Instant(10022L)),
+            timerInGlobalWindow("B", new Instant(2000L), new Instant(10032L)),
+            timerInGlobalWindow("Y", new Instant(2100L), new 
Instant(10042L))));
+    assertThat(
+        fakeTimerClient.getTimers(eventFamilyTimer),
+        contains(
+            timerInGlobalWindow("X", "event-timer1", new Instant(1000L), new 
Instant(1003L)),
+            timerInGlobalWindow("Y", "event-timer1", new Instant(1100L), new 
Instant(1103L)),
+            timerInGlobalWindow("X", "event-timer1", new Instant(1200L), new 
Instant(1203L)),
+            timerInGlobalWindow("Y", "event-timer1", new Instant(1300L), new 
Instant(1303L)),
+            timerInGlobalWindow("A", "event-timer1", new Instant(1400L), new 
Instant(2413L)),
+            timerInGlobalWindow("B", "event-timer1", new Instant(1500L), new 
Instant(2513L)),
+            timerInGlobalWindow("A", "event-timer1", new Instant(1600L), new 
Instant(2613L)),
+            timerInGlobalWindow("X", "event-timer1", new Instant(1700L), new 
Instant(1723L)),
+            timerInGlobalWindow("C", "event-timer1", new Instant(1800L), new 
Instant(1823L)),
+            timerInGlobalWindow("B", "event-timer1", new Instant(1900L), new 
Instant(1923L)),
+            timerInGlobalWindow("B", "event-timer1", new Instant(2000L), new 
Instant(2033L)),
+            timerInGlobalWindow("Y", "event-timer1", new Instant(2100L), new 
Instant(2143L))));
+    assertThat(
+        fakeTimerClient.getTimers(processingFamilyTimer),
+        contains(
+            timerInGlobalWindow("X", "processing-timer1", new Instant(1000L), 
new Instant(10004L)),
+            timerInGlobalWindow("Y", "processing-timer1", new Instant(1100L), 
new Instant(10004L)),
+            timerInGlobalWindow("X", "processing-timer1", new Instant(1200L), 
new Instant(10004L)),
+            timerInGlobalWindow("Y", "processing-timer1", new Instant(1300L), 
new Instant(10004L)),
+            timerInGlobalWindow("A", "processing-timer1", new Instant(1400L), 
new Instant(10014L)),
+            timerInGlobalWindow("B", "processing-timer1", new Instant(1500L), 
new Instant(10014L)),
+            timerInGlobalWindow("A", "processing-timer1", new Instant(1600L), 
new Instant(10014L)),
+            timerInGlobalWindow("X", "processing-timer1", new Instant(1700L), 
new Instant(10024L)),
+            timerInGlobalWindow("C", "processing-timer1", new Instant(1800L), 
new Instant(10024L)),
+            timerInGlobalWindow("B", "processing-timer1", new Instant(1900L), 
new Instant(10024L)),
+            timerInGlobalWindow("B", "processing-timer1", new Instant(2000L), 
new Instant(10034L)),
+            timerInGlobalWindow(
+                "Y", "processing-timer1", new Instant(2100L), new 
Instant(10044L))));
     mainOutputValues.clear();
 
     assertFalse(fakeTimerClient.isOutboundClosed(eventTimer));
     assertFalse(fakeTimerClient.isOutboundClosed(processingTimer));
+    assertFalse(fakeTimerClient.isOutboundClosed(eventFamilyTimer));
+    assertFalse(fakeTimerClient.isOutboundClosed(processingFamilyTimer));
     fakeTimerClient.closeInbound(eventTimer);
     fakeTimerClient.closeInbound(processingTimer);
+    fakeTimerClient.closeInbound(eventFamilyTimer);
+    fakeTimerClient.closeInbound(processingFamilyTimer);
 
     Iterables.getOnlyElement(finishFunctionRegistry.getFunctions()).run();
     assertThat(mainOutputValues, empty());
 
     assertTrue(fakeTimerClient.isOutboundClosed(eventTimer));
     assertTrue(fakeTimerClient.isOutboundClosed(processingTimer));
+    assertTrue(fakeTimerClient.isOutboundClosed(eventFamilyTimer));
+    assertTrue(fakeTimerClient.isOutboundClosed(processingFamilyTimer));
 
     Iterables.getOnlyElement(teardownFunctions).run();
     assertThat(mainOutputValues, empty());
 
     assertEquals(
         ImmutableMap.<StateKey, ByteString>builder()
             .put(bagUserStateKey("bag", "X"), encode("X0", "X1", "X2", 
"processing"))
-            .put(bagUserStateKey("bag", "Y"), encode("Y1", "Y2"))
+            .put(bagUserStateKey("bag", "Y"), encode("Y1", "Y2", 
"processing-family"))
             .put(bagUserStateKey("bag", "A"), encode("A0", "event", "event"))
-            .put(bagUserStateKey("bag", "B"), encode("event", "processing"))
+            .put(bagUserStateKey("bag", "B"), encode("event", "processing", 
"event-family"))
             .put(bagUserStateKey("bag", "C"), encode("C0", "processing"))
             .build(),
         fakeStateClient.getData());
   }
 
+  private <K> org.apache.beam.runners.core.construction.Timer<K> 
timerInGlobalWindow(
+      K userKey, Instant holdTimestamp, Instant fireTimestamp) {
+    return timerInGlobalWindow(userKey, "", holdTimestamp, fireTimestamp);
+  }
+
   private <T> WindowedValue<T> valueInWindow(T value, BoundedWindow window) {
     return WindowedValue.of(value, window.maxTimestamp(), window, 
PaneInfo.NO_FIRING);
   }
 
   private <K> org.apache.beam.runners.core.construction.Timer<K> 
timerInGlobalWindow(

Review comment:
       dynamicTimerInGlobalWindow?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 435842)
    Time Spent: 4h 20m  (was: 4h 10m)

> Support Dynamic Timer in Java SDK over FnApi
> --------------------------------------------
>
>                 Key: BEAM-9603
>                 URL: https://issues.apache.org/jira/browse/BEAM-9603
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-harness
>            Reporter: Boyuan Zhang
>            Assignee: Yichi Zhang
>            Priority: P2
>          Time Spent: 4h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to