[ 
https://issues.apache.org/jira/browse/BEAM-6857?focusedWorklogId=372797&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-372797
 ]

ASF GitHub Bot logged work on BEAM-6857:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Jan/20 05:40
            Start Date: 16/Jan/20 05:40
    Worklog Time Spent: 10m 
      Work Description: rehmanmuradali commented on pull request #10316: 
[BEAM-6857] Support Dynamic Timers
URL: https://github.com/apache/beam/pull/10316#discussion_r367239320
 
 

 ##########
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
 ##########
 @@ -285,13 +308,27 @@ public static TimerData windmillTimerToTimerData(
    * <p>This is necessary because Windmill will deduplicate based only on this 
tag.
    */
   public static ByteString timerTag(WindmillNamespacePrefix prefix, TimerData 
timerData) {
-    String tagString =
-        new StringBuilder()
-            .append(prefix.byteString().toStringUtf8()) // this never ends 
with a slash
-            .append(timerData.getNamespace().stringKey()) // this must begin 
and end with a slash
-            .append('+')
-            .append(timerData.getTimerId()) // this is arbitrary; currently 
unescaped
-            .toString();
+    String tagString;
+    // Timers without timerFamily would have timerFamily would be an empty 
string
+    if ("".equals(timerData.getTimerFamilyId())) {
+      tagString =
+          new StringBuilder()
+              .append(prefix.byteString().toStringUtf8()) // this never ends 
with a slash
+              .append(timerData.getNamespace().stringKey()) // this must begin 
and end with a slash
+              .append('+')
+              .append(timerData.getTimerId()) // this is arbitrary; currently 
unescaped
+              .toString();
+    } else {
+      tagString =
+          new StringBuilder()
+              .append(prefix.byteString().toStringUtf8()) // this never ends 
with a slash
+              .append(timerData.getNamespace().stringKey()) // this must begin 
and end with a slash
+              .append('+')
+              .append(timerData.getTimerId()) // this is arbitrary; currently 
unescaped
+              .append('+')
+              .append(timerData.getTimerFamilyId())
+              .toString();
+    }
 
 Review comment:
   > Overall this looks good now! Good job debugging the overlapping timers.
   > 
   > I think the only thing that remaining will be to add a UsesTimerMap test 
category so that we can exclude the runners that don't implement this. I'll 
trigger ValidatesRunner for those runners, though I expect them to break.
   
   Thank you @reuvenlax  for sparing your time to guide me whenever I get 
stuck. I am adding a new category for TImerMap. Seems like I only need to 
exclude Flink runner for this category.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 372797)
    Time Spent: 18h 50m  (was: 18h 40m)

> Support dynamic timers
> ----------------------
>
>                 Key: BEAM-6857
>                 URL: https://issues.apache.org/jira/browse/BEAM-6857
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>            Reporter: Reuven Lax
>            Assignee: Shehzaad Nakhoda
>            Priority: Major
>          Time Spent: 18h 50m
>  Remaining Estimate: 0h
>
> The Beam timers API currently requires each timer to be statically specified 
> in the DoFn. The user must provide a separate callback method per timer. For 
> example:
>  
> {code:java}
> DoFn<String, String>()
> {   
>   @TimerId("timer1") 
>   private final TimerSpec timer1 = TimerSpecs.timer(...);   
>   @TimerId("timer2") 
>   private final TimerSpec timer2 = TimerSpecs.timer(...);                 
>   ...... set timers in processElement    
>   @OnTimer("timer1") 
>   public void onTimer1() { .....}
>   @OnTimer("timer2") 
>   public void onTimer2() {....}
> }
> {code}
>  
> However there are many cases where the user does not know the set of timers 
> statically when writing their code. This happens when the timer tag should be 
> based on the data. It also happens when writing a DSL on top of Beam, where 
> the DSL author has to create DoFns but does not know statically which timers 
> their users will want to set (e.g. Scio).
>  
> The goal is to support dynamic timers. Something as follows;
>  
> {code:java}
> DoFn<String, String>() 
> {
>   @TimerId("timer") 
>   private final TimerSpec timer1 = TimerSpecs.dynamicTimer(...);
>   @ProcessElement process(@TimerId("timer") DynamicTimer timer)
>   {
>        timer.set("tag1'", ts);       
>        timer.set("tag2", ts);     
>   }
>   @OnTimer("timer") 
>   public void onTimer1(@TimerTag String tag) { .....}
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to