[ 
https://issues.apache.org/jira/browse/BEAM-2953?focusedWorklogId=155610&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155610
 ]

ASF GitHub Bot logged work on BEAM-2953:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Oct/18 21:08
            Start Date: 17/Oct/18 21:08
    Worklog Time Spent: 10m 
      Work Description: akedin commented on a change in pull request #6540: 
[BEAM-2953] Advanced Timeseries examples.
URL: https://github.com/apache/beam/pull/6540#discussion_r226095495
 
 

 ##########
 File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/transforms/OrderOutput.java
 ##########
 @@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.timeseries.transforms;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.util.Durations;
+import com.google.protobuf.util.Timestamps;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.extensions.timeseries.TimeSeriesOptions;
+import org.apache.beam.sdk.extensions.timeseries.configuration.TSConfiguration;
+import org.apache.beam.sdk.extensions.timeseries.protos.TimeSeriesData;
+import org.apache.beam.sdk.extensions.timeseries.utils.TSAccums;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.*;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Create ordered output from the fixed windowed aggregations. */
+@SuppressWarnings("serial")
+@Experimental
+public class OrderOutput
+    extends PTransform<
+        PCollection<KV<TimeSeriesData.TSKey, TimeSeriesData.TSAccum>>,
+        PCollection<KV<TimeSeriesData.TSKey, TimeSeriesData.TSAccum>>> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(OrderOutput.class);
+
+  @Override
+  public PCollection<KV<TimeSeriesData.TSKey, TimeSeriesData.TSAccum>> expand(
+      PCollection<KV<TimeSeriesData.TSKey, TimeSeriesData.TSAccum>> input) {
+
+    TSConfiguration options =
+        TSConfiguration.createConfigurationFromOptions(
+            input.getPipeline().getOptions().as(TimeSeriesOptions.class));
+
+    // Move into Global Time Domain, this allows Keyed State to retain its 
value across windows.
+    // Late Data is dropped at this stage.
+
+    PCollection<KV<TimeSeriesData.TSKey, TimeSeriesData.TSAccum>> 
windowNoLateData =
+        input.apply(
+            "Global Window",
+            Window.<KV<TimeSeriesData.TSKey, TimeSeriesData.TSAccum>>into(new 
GlobalWindows())
+                .withAllowedLateness(Duration.ZERO));
+
+    return windowNoLateData
+        .apply(ParDo.of(new GetPreviousData(options)))
+        .apply(
+            "Re Window post Global",
+            Window.<KV<TimeSeriesData.TSKey, TimeSeriesData.TSAccum>>into(
+                    FixedWindows.of(options.downSampleDuration()))
+                // TODO: DirectRunner not showing results with exact late date 
match
+                
//.withAllowedLateness(options.downSampleDuration().plus(options.downSampleDuration()))
+                .withAllowedLateness(Duration.standardDays(1))
+                .discardingFiredPanes());
+  }
+
+  /**
+   * When a new key is seen (state == null) for the first time, we will create 
a timer to fire in
+   * the next window boundary. If this is not the first time the key is seen 
we check the ttl to see
+   * if a new timer is required. In-between timers firing, we will add all new 
elements to a List.
+   * lets have 3 elements coming in at various time and then there is NULL for 
forth time slice
+   * [t1,t2,t3, NULL] t1 arrives and we set timer to fire at 
t1.plus(downsample duration + fixed
+   * offset).
+   *
+   * <p>Then we have state.set(t1). t2 arrives and we add t1 to t2 as the 
previous value and we
+   * output the value state.set(t2)
+   *
+   * <p>t3 arrives and we add t2 to t3 as the previous value and we output the 
value state.set(t3)
+   *
+   * <p>at time 4 we have no entry, here we use the last known state which is 
t3 and we change the
+   * time values.
+   *
+   * <p>NOTE: Need to see if this is really needed In case there is more than 
one element arriving
+   * before the timer fires, we will also hold a List of elements If the list 
of elements is > 0
+   * then we loop through the core logic until the list is exhausted
+   */
+  @Experimental
+  public static class GetPreviousData
+      extends DoFn<
+          KV<TimeSeriesData.TSKey, TimeSeriesData.TSAccum>,
+          KV<TimeSeriesData.TSKey, TimeSeriesData.TSAccum>> {
+
+    TSConfiguration options;
+
+    private GetPreviousData(TSConfiguration configuration) {
+      this.options = configuration;
+    }
+
+    // Setup our state objects
+
+    @StateId("lastKnownState")
+    private final StateSpec<ValueState<TimeSeriesData.TSAccum>> lastKnownState 
=
+        StateSpecs.value(ProtoCoder.of(TimeSeriesData.TSAccum.class));
+
+    @StateId("holdingList")
+    private final StateSpec<BagState<TimeSeriesData.TSAccum>> holdingList =
+        StateSpecs.bag(ProtoCoder.of(TimeSeriesData.TSAccum.class));
+
+    @TimerId("alarm")
+    private final TimerSpec alarm = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    /**
+     * This is the simple path... A new element is here so we add it to the 
list of elements and set
+     * a timer
+     *
+     * @param c
+     * @param holdingList
+     * @param timer
+     */
+    @ProcessElement
+    public void processElement(
+        ProcessContext c,
+        @StateId("holdingList") BagState<TimeSeriesData.TSAccum> holdingList,
+        @TimerId("alarm") Timer timer) {
+
+      Instant alarm = c.timestamp().plus(options.downSampleDuration());
+
+      timer.set(alarm);
 
 Review comment:
   Do I understand it right that for all incoming elements we will set alarms?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 155610)
    Time Spent: 3h 20m  (was: 3h 10m)

> Timeseries processing extensions using state API
> ------------------------------------------------
>
>                 Key: BEAM-2953
>                 URL: https://issues.apache.org/jira/browse/BEAM-2953
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-ideas
>    Affects Versions: 2.7.0
>            Reporter: Reza ardeshir rokni
>            Assignee: Reuven Lax
>            Priority: Minor
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> A general set of timeseries transforms that abstract the user from the 
> process of dealing with some of the common problems when dealing with 
> timeseries using BEAM (in stream or batch mode).
> BEAM can be used to build out some very interesting pre-processing stages for 
> time series data. Some examples that will be useful:
>  - Downsampling time series based on simple MIN, MAX, COUNT, SUM, LAST, FIRST
>  - Creating a value for each downsampled window even if no value has been 
> emitted for the specific key. 
>  - Loading the value of a downsample with the previous value (used in FX with 
> previous close being brought into current open value)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to