[
https://issues.apache.org/jira/browse/BEAM-2953?focusedWorklogId=151488&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151488
]
ASF GitHub Bot logged work on BEAM-2953:
----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Oct/18 01:48
Start Date: 05/Oct/18 01:48
Worklog Time Spent: 10m
Work Description: rezarokni commented on a change in pull request #6540:
[BEAM-2953] Advanced Timeseries examples.
URL: https://github.com/apache/beam/pull/6540#discussion_r222873827
##########
File path:
examples/java/src/main/java/org/apache/beam/examples/timeseries/TimeSeriesExampleToBigTable.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.examples.timeseries;
+
+import com.google.cloud.bigtable.beam.CloudBigtableIO;
+import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
+import com.google.protobuf.util.Timestamps;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.examples.timeseries.configuration.TSConfiguration;
+import org.apache.beam.examples.timeseries.protos.TimeSeriesData;
+import org.apache.beam.examples.timeseries.transforms.*;
+import org.apache.beam.examples.timeseries.utils.TSAccumSequences;
+import org.apache.beam.examples.timeseries.utils.TSAccums;
+import org.apache.beam.examples.timeseries.utils.TSMultiVariateDataPoints;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This example pipeline is used to illustrate an advanced use of Keyed state
and timers. The
+ * pipeline extracts interesting information from timeseries data. One of the
key elements, is the
+ * transfer of data between fixed windows for a given key, as well as backfill
when a key does not
+ * have any new data within a time boundary. This sample should not be used in
production.
+ *
+ * <p>The output of this pipeline is to Google Cloud Bigtable.
+ */
+@Experimental
+public class TimeSeriesExampleToBigTable {
+
+ static final Logger LOG =
LoggerFactory.getLogger(TimeSeriesExampleToBigTable.class);
+
+ static final String FILE_LOCATION = "/tmp/tf/";
+
+ /** Push Timeseries data into BigTable */
+ public static void main(String[] args) {
+
+ // Create pipeline
+ TimeSeriesOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(TimeSeriesOptions.class);
+
+ TSConfiguration configuration =
+ TSConfiguration.builder()
+ .downSampleDuration(Duration.standardSeconds(5))
+ .timeToLive(Duration.standardMinutes(1))
+ .fillOption(TSConfiguration.BFillOptions.LAST_KNOWN_VALUE);
+
+ Pipeline p = Pipeline.create(options);
+
+ // [START bigtable_dataflow_connector_config]
+
+ CloudBigtableTableConfiguration config =
+ new CloudBigtableTableConfiguration.Builder()
+ .withProjectId(options.getProjectId())
+ .withInstanceId(options.getBigTableInstanceId())
+ .withTableId(options.getBigTableTableId())
+ .build();
+
+ // [END bigtable_dataflow_connector_config]
+
+ // ------------ READ DATA ------------
+
+ // Read some dummy timeseries data
+ PCollection<KV<TimeSeriesData.TSKey, TimeSeriesData.TSDataPoint>> readData
=
+ p.apply(Create.of(SinWaveSample.generateSinWave()))
+ .apply(ParDo.of(new TSMultiVariateDataPoints.ExtractTimeStamp()))
+ .apply(ParDo.of(new
TSMultiVariateDataPoints.ConvertMultiToUniDataPoint()));
+
+ // ------------ Create perfect rectangles of data--------
+
+ PCollection<KV<TimeSeriesData.TSKey, TimeSeriesData.TSAccum>> downSampled =
+ readData.apply(new ExtractAggregates(configuration)).apply(new
GetWindowData());
+
+ PCollection<KV<TimeSeriesData.TSKey, TimeSeriesData.TSAccum>> weHaveOrder =
+ downSampled.apply(new OrderOutput(configuration));
+
+ // ------------ OutPut Data as Logs and TFRecords--------
+
+ // This transform is purely to allow logged debug output, it will fail
with OOM if large dataset is used.
+ weHaveOrder.apply(new DebugSortedResult());
+ // Write to Bigtable
+
+ // tf.Example output
+ weHaveOrder
+ .apply(ParDo.of(new GetValueFromKV<>()))
+ .apply(new TSAccums.OutPutToBigTable())
+ .apply(CloudBigtableIO.writeToTable(config));
+
+ // Create 3 different window lengths for the TFSequenceExample
+ weHaveOrder
+ .apply(
+ new TSAccumToFixedWindowSeq(
+ "Sequence of 1 Min", configuration,
Duration.standardMinutes(1)))
+ .apply(ParDo.of(new GetValueFromKV<>()))
+ .apply(new TSAccumSequences.OutPutToBigTable())
+ .apply(CloudBigtableIO.writeToTable(config));
+
+ weHaveOrder
+ .apply(
+ new TSAccumToFixedWindowSeq(
+ "Sequence of 5 Min", configuration,
Duration.standardMinutes(5)))
+ .apply(ParDo.of(new GetValueFromKV<>()))
+ .apply(new TSAccumSequences.OutPutToBigTable())
+ .apply(CloudBigtableIO.writeToTable(config));
+
+ weHaveOrder
+ .apply(
+ new TSAccumToFixedWindowSeq(
+ "Sequence of 15 Min", configuration,
Duration.standardMinutes(15)))
+ .apply(ParDo.of(new GetValueFromKV<>()))
+ .apply(new TSAccumSequences.OutPutToBigTable())
+ .apply(CloudBigtableIO.writeToTable(config));
+
+ p.run();
+ }
+
+ /** Simple data generator that creates some dummy test data for the
timeseries examples. */
+ public static class SinWaveSample {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SinWaveSample.class);
+
+ public static List<TimeSeriesData.TSMultiVariateDataPoint>
generateSinWave() {
+
+ double y;
+ double yBase = 1;
+ double scale = 20;
+
+ List<TimeSeriesData.TSMultiVariateDataPoint> dataPoints = new
ArrayList<>();
+
+ for (int k = 0; k < 10; k++) {
+
+ Instant now = Instant.parse("2018-01-01T00:00Z");
+
+ for (int i = 0; i < 1000; i++) {
+
+ if (!((i % 10 == 0))) {
+
+ Instant dataPointTimeStamp = now.plus(Duration.standardSeconds(i));
+
+ y = (yBase - Math.sin(Math.toRadians(i)) * scale);
+
+ TimeSeriesData.TSMultiVariateDataPoint mvts =
+ TimeSeriesData.TSMultiVariateDataPoint.newBuilder()
+
.setKey(TimeSeriesData.TSKey.newBuilder().setMajorKey("Sin-" + k).build())
+ .putData("x",
TimeSeriesData.Data.newBuilder().setIntVal(i).build())
Review comment:
I do have utility class for each of the main protos. In similar style to
Timestamp with protobuff which has utils in Timestamps. Each of the protos have
a (s) util class.
I do have a factory for creating a TimeSeriesData.Data object, which I will
reuse here.
https://github.com/rezarokni/beam/blob/c492cbd74981e2c9c3ae206cb7a7b83b3a487e11/examples/java/src/main/java/org/apache/beam/examples/timeseries/utils/TSDatas.java#L241
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 151488)
Time Spent: 2.5h (was: 2h 20m)
> Create more advanced Timeseries processing examples using state API
> -------------------------------------------------------------------
>
> Key: BEAM-2953
> URL: https://issues.apache.org/jira/browse/BEAM-2953
> Project: Beam
> Issue Type: Improvement
> Components: examples-java
> Affects Versions: 2.1.0
> Reporter: Reza ardeshir rokni
> Assignee: Reuven Lax
> Priority: Minor
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
> As described in the phase 1 portion of this solution outline:
> https://cloud.google.com/solutions/correlating-time-series-dataflow
> BEAM can be used to build out some very interesting pre-processing stages for
> time series data. Some examples that will be useful:
> - Downsampling time series based on simple AVG, MIN, MAX
> - Creating a value for each time window using generatesequence as a seed
> - Loading the value of a downsample with the previous value (used in FX with
> previous close being brought into current open value)
> This will show some concrete examples of keyed state as well as the use of
> combiners.
> The samples can also be used to show how you can create a ordered list of
> values per key from a unbounded topic which has multiple time series keys.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)