[ 
https://issues.apache.org/jira/browse/BEAM-3200?focusedWorklogId=314577&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-314577
 ]

ASF GitHub Bot logged work on BEAM-3200:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Sep/19 18:56
            Start Date: 18/Sep/19 18:56
    Worklog Time Spent: 10m 
      Work Description: bcbradle commented on issue #9556: [BEAM-3200, 
BEAM-3772] Fix: Honor create and write disposition with dynamic destinations
URL: https://github.com/apache/beam/pull/9556#issuecomment-532816447
 
 
   ```
   Data pipeline that (eventually) produces the erroneous behavior
   
   package com.awair.iot.timeseries.datapipeline.bigquery.write.devicemetricsraw
   import org.apache.beam.sdk.Pipeline
   import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{CreateDisposition, 
WriteDisposition}
   import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryIO, BigQueryUtils}
   import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO
   import org.apache.beam.sdk.options.PipelineOptionsFactory
   import org.apache.beam.sdk.schemas.Schema
   import org.apache.beam.sdk.transforms.{Filter, FlatMapElements}
   import org.joda.time.Duration
   
   object DeviceMetricsRaw {
     import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
   
     trait PipelineOptions extends DataflowPipelineOptions {
       def getSubscriptionName: String
       def setSubscriptionName(value: String): Unit
       def getDatasetName: String
       def setDatasetName(value: String): Unit
     }
   
     import java.io.{InputStream, OutputStream}
     import com.google.api.services.bigquery.model._
     import org.apache.beam.sdk.coders._
     import org.apache.beam.sdk.transforms.SerializableFunction
     import org.joda.time.Instant
     import org.joda.time.format.ISODateTimeFormat
   
     case class Data[T](deviceType: String, timestamp: Instant, value: T, 
measurement: String, deviceId: Long) {
       def forDeviceMetricsRaw: TableRow = new TableRow().set(
         "device_id", deviceId
       ).set(
         "timestamp", ISODateTimeFormat.dateTime().print(timestamp)
       ).set(
         "value", value
       )
       def forIotTimeSeriesRaw: TableRow = new TableRow().set(
         "device_uuid", s"${deviceType}_$deviceId"
       ).set(
         "measurement", measurement
       ).set(
         "timestamp", ISODateTimeFormat.dateTime().print(timestamp)
       ).set(
         "value", value
       )
     }
   
     object Data {
       val coder: AtomicCoder[Data[Double]] = new AtomicCoder[Data[Double]] {
         private val stringCoder: StringUtf8Coder = StringUtf8Coder.of()
         private val instantCoder: InstantCoder = InstantCoder.of()
         private val doubleCoder: DoubleCoder = DoubleCoder.of()
         private val longCoder: VarLongCoder = VarLongCoder.of()
         def encode(value: Data[Double], os: OutputStream): Unit = {
           stringCoder.encode(value.deviceType, os)
           instantCoder.encode(value.timestamp, os)
           doubleCoder.encode(value.value, os)
           stringCoder.encode(value.measurement, os)
           longCoder.encode(value.deviceId, os)
         }
         def decode(is: InputStream): Data[Double] = {
           val deviceType = stringCoder.decode(is)
           val timestamp = instantCoder.decode(is)
           val value = doubleCoder.decode(is)
           val measurement = stringCoder.decode(is)
           val deviceId = longCoder.decode(is)
           Data(deviceType, timestamp, value, measurement, deviceId)
         }
       }
       class DataForDeviceMetricsRaw[T] extends SerializableFunction[Data[T], 
TableRow] {
         override def apply(input: Data[T]): TableRow = 
input.forDeviceMetricsRaw
       }
       class DataForIotTimeSeriesRaw[T] extends SerializableFunction[Data[T], 
TableRow] {
         override def apply(input: Data[T]): TableRow = 
input.forIotTimeSeriesRaw
       }
     }
   
     import java.util.function.Predicate
     import java.util.regex.Pattern
     import java.{lang, util}
     import com.awair.iot.timeseries.datapipeline
     import com.awair.iot.timeseries.v1beta1.Timeseries.{Downsampler, 
TimeSeries}
     import org.apache.beam.sdk.transforms.{InferableFunction, ProcessFunction}
     import org.joda.time.Instant
   
     object Timeseries {
       class IsValidTimeseries extends ProcessFunction[TimeSeries, 
lang.Boolean] {
         val deviceUuidHasValidCharactersPattern: Pattern = 
Pattern.compile("^[a-zA-Z0-9\\-\\.]+_[0-9]+$")
         val measurementHasValidCharactersPattern: Pattern = 
Pattern.compile("^[a-zA-Z0-9_\\-\\.]+$")
         override def apply(input: TimeSeries): lang.Boolean = {
           val deviceUuidIsNonNull: Predicate[lang.String] = (t: lang.String) 
=> t != null
           val deviceUuidHasValidCharacters: Predicate[String] = 
deviceUuidHasValidCharactersPattern.asPredicate()
           val validateDeviceUuid: Predicate[lang.String] = deviceUuidIsNonNull 
and deviceUuidHasValidCharacters
           val measurementIsNonNull: Predicate[lang.String] = (t: lang.String) 
=> t != null
           val measurementHasValidCharacters: Predicate[lang.String] = 
measurementHasValidCharactersPattern.asPredicate()
           val validateMeasurement: Predicate[lang.String] = 
measurementIsNonNull and measurementHasValidCharacters
           validateDeviceUuid.test(input.getDeviceUuid) && 
validateMeasurement.test(input.getMeasurement)
         }
       }
   
       class IsRawTimeseries extends ProcessFunction[TimeSeries, lang.Boolean] {
         override def apply(input: TimeSeries): lang.Boolean = {
           input.getDownsampler == Downsampler.getDefaultInstance
         }
       }
   
       class TimeseriesToIterableData extends InferableFunction[TimeSeries, 
lang.Iterable[Data[Double]]] {
         override def apply(input: TimeSeries): lang.Iterable[Data[Double]] = {
           val deviceTypeAndId = input.getDeviceUuid.split("_")
           val it = input
             .getDataPointsList
             .stream
             .map[Data[Double]] { dp =>
               Data(
                 deviceTypeAndId(0),
                 new Instant(dp.getTimestamp),
                 dp.getValue,
                 input.getMeasurement,
                 deviceTypeAndId(1).toLong
               )
             }.iterator()
           new lang.Iterable[Data[Double]] {
             override def iterator(): util.Iterator[Data[Double]] = it
           }
         }
       }
     }
   
     import java.util.regex.Pattern
     import com.google.api.services.bigquery.model.{Clustering, TableReference, 
TimePartitioning}
     import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryHelpers, 
TableDestination}
     import org.apache.beam.sdk.transforms.SerializableFunction
     import org.apache.beam.sdk.values.ValueInSingleWindow
   
     class BigqueryDestinationFunc(project: String, datasetName: String) 
extends SerializableFunction[ValueInSingleWindow[Data[Double]], 
TableDestination] {
       private val validTableNamePattern = Pattern.compile("[^a-zA-Z0-9]")
       override def apply(input: ValueInSingleWindow[Data[Double]]): 
TableDestination = {
         new TableDestination(
           BigQueryHelpers.toTableSpec(
             new TableReference().setProjectId(
               project
             ).setDatasetId(
               datasetName
             ).setTableId(
          
s"${validTableNamePattern.matcher(input.getValue.measurement).replaceAll("").toLowerCase}_${validTableNamePattern.matcher(input.getValue.deviceType).replaceAll("").toLowerCase}"
             )
           ),
           "",
           new 
TimePartitioning().setRequirePartitionFilter(true).setField("timestamp").setType("DAY"),
           new Clustering().setFields(java.util.Arrays.asList("device_id"))
         )
       }
     }
   
     def main(args: Array[String]): Unit = {
       val options = PipelineOptionsFactory
         .fromArgs(args: _*)
         .withValidation()
         .as(classOf[PipelineOptions])
       val pipeline = Pipeline.create(options)
       val project = options.getProject
       val subscriptionName = options.getSubscriptionName
       val datasetName = options.getDatasetName
       val rows = pipeline.apply(
         "read from pubsub",
         PubsubIO.readProtos(
           classOf[TimeSeries]
         ).fromSubscription(
           s"projects/$project/subscriptions/$subscriptionName"
         )
       ).apply(
         "filter for raw",
         Filter.by(new datapipeline.Timeseries.IsRawTimeseries)
       ).apply(
         "filter for valid",
         Filter.by(new datapipeline.Timeseries.IsValidTimeseries)
       ).apply(
         "flatmap into rows",
         FlatMapElements.via(new 
datapipeline.Timeseries.TimeseriesToIterableData)
       ).setCoder(Data.coder).apply(
         "write to bigquery",
         BigQueryIO.write(
         ).withNumFileShards(
           8
         ).withFormatFunction(
           new Data.DataForDeviceMetricsRaw[Double]
         ).withSchema(
           BigQueryUtils.toTableSchema(
             Schema.of(
               Schema.Field.of("device_id", Schema.FieldType.INT64),
               Schema.Field.of("timestamp", Schema.FieldType.DATETIME),
               Schema.Field.of("value", Schema.FieldType.DOUBLE)
             )
           )
         ).to(
           new BigqueryDestinationFunc(project, datasetName)
         ).withClustering(
         ).optimizedWrites(
         ).withWriteDisposition(
           WriteDisposition.WRITE_APPEND
         ).withCreateDisposition(
           CreateDisposition.CREATE_IF_NEEDED
         ).withMethod(
           BigQueryIO.Write.Method.FILE_LOADS
         ).withTriggeringFrequency(
           Duration.standardMinutes(5)
         )
       )
       pipeline.run()
       /*
       deployment as:
       sbt "runMain 
com.awair.iot.timeseries.datapipeline.bigquery.write.devicemetricsraw.DeviceMetricsRaw\
       --maxNumWorkers=4\
       --numWorkers=4\
       --workerMachineType=n1-standard-4\
       --runner=DataflowRunner\
       --project=awair-prd\
       --jobName=iot-time-series-datapipeline\
       --subscriptionName=iot-time-series-datapipeline\
       --datasetName=device_metrics_raw\
       --region=us-central1\
       --autoscalingAlgorithm=THROUGHPUT_BASED\
       --workerDiskType=compute.googleapis.com/projects//zones//diskTypes/pd-ssd
       --diskSizeGb=100\
       --update"
        */
     }
   }
   ```
   Protobuf to generate Timeseries type (java)
   ```
   syntax = "proto3";
   
   package awair.iot.timeseries.v1beta1;
   
   option java_package = "com.awair.iot.timeseries.v1beta1";
   
   import "google/protobuf/empty.proto";
   
   message TimeSeries {
       string device_uuid = 1;
       string measurement = 2;
       repeated DataPoint data_points = 3;
       Downsampler downsampler = 4;
   }
   
   message DataPoint {
       int64 timestamp = 1;
       double value = 2;
   }
   
   message Downsampler {
       enum Method {
           MAX = 0;
           MIN = 1;
           COUNT = 2;
           SUM = 3;
           AVG = 4;
       }
       Method method = 1;
       int32 interval_seconds = 2;
   }
   
   service IotTimeSeriesService {
       rpc GetTimeSeries (GetTimeSeriesRequest) returns (TimeSeries);
   }
   
   message GetTimeSeriesRequest {
       string device_uuid = 1;
       string measurement = 2;
       Downsampler downsampler = 3;
       int64 start_time = 4;
       int64 end_time = 5;
       int64 max_data_points = 6;
   }
   ```
   
   If you want to do a live debugging in this project I can see about arranging 
temporary access to our backend so that you can
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 314577)
    Time Spent: 5.5h  (was: 5h 20m)

> Streaming Pipeline throws RuntimeException when using DynamicDestinations and 
> Method.FILE_LOADS
> -----------------------------------------------------------------------------------------------
>
>                 Key: BEAM-3200
>                 URL: https://issues.apache.org/jira/browse/BEAM-3200
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.2.0
>            Reporter: AJ
>            Priority: Critical
>          Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> I am trying to use Method.FILE_LOADS for loading data into BQ in my streaming 
> pipeline using RC3 release of 2.2.0. I am writing to around 500 tables using 
> DynamicDestinations and I am also using 
> withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED). Everything works 
> fine when the first time bigquery load jobs get triggered. But on subsequent 
> triggers pipeline throws a RuntimeException about table not found even though 
> I created the pipeline with CreateDisposition.CREATE_IF_NEEDED. The exact 
> exception is:
> {code}
> java.lang.RuntimeException: Failed to create load job with id prefix 
> 717aed9ed1ef4aa7a616e1132f8b7f6d_a0928cae3d670b32f01ab2d9fe5cc0ee_00001_00001,
>  reached max retries: 3, last failed load job: {
>   "configuration" : {
>     "load" : {
>       "createDisposition" : "CREATE_NEVER",
>       "destinationTable" : {
>         "datasetId" : ...,
>         "projectId" : ...,
>         "tableId" : ....
>       },
>     "errors" : [ }
>       "message" : "Not found: Table ....,
>       "reason" : "notFound"
>     } ],
> {code}
> My theory is all the subsequent load jobs get trigged using CREATE_NEVER 
> disposition and 
> this might be due to 
> https://github.com/apache/beam/blob/release-2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L140
> When using DynamicDestinations all the destination tables might not be known 
> during the first trigger and hence the pipeline's create disposition should 
> be respected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to