[ 
https://issues.apache.org/jira/browse/BEAM-3200?focusedWorklogId=314580&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-314580
 ]

ASF GitHub Bot logged work on BEAM-3200:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Sep/19 19:00
            Start Date: 18/Sep/19 19:00
    Worklog Time Spent: 10m 
      Work Description: reuvenlax commented on issue #9556: [BEAM-3200, 
BEAM-3772] Fix: Honor create and write disposition with dynamic destinations
URL: https://github.com/apache/beam/pull/9556#issuecomment-532821449
 
 
   Looking at the code, I think the support for clustering (which was added
   recently) is a bit broken, especially with dynamic destinations.
   TableDestinationCoderV3 (which encodes clustering information) is not
   invoked if you use DynamicDestinations, and BatchLoads.java hard codes
   TableDestinationCoderV2 in several places.
   
   On Wed, Sep 18, 2019 at 11:54 AM Reuven Lax <re...@google.com> wrote:
   
   > It looks like you're not calling withTimePartitioning on BigQueryIO.
   >
   > On Wed, Sep 18, 2019 at 11:47 AM Brian Bradley <notificati...@github.com>
   > wrote:
   >
   >> Data pipeline that (eventually) produces the erroneous behavior
   >> `
   >> package
   >> com.awair.iot.timeseries.datapipeline.bigquery.write.devicemetricsraw
   >>
   >> import org.apache.beam.sdk.Pipeline
   >> import
   >> org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{CreateDisposition,
   >> WriteDisposition}
   >> import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryIO, BigQueryUtils}
   >> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO
   >> import org.apache.beam.sdk.options.PipelineOptionsFactory
   >> import org.apache.beam.sdk.schemas.Schema
   >> import org.apache.beam.sdk.transforms.{Filter, FlatMapElements}
   >> import org.joda.time.Duration
   >>
   >> object DeviceMetricsRaw {
   >>
   >> import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
   >>
   >> trait PipelineOptions extends DataflowPipelineOptions {
   >>
   >> def getSubscriptionName: String
   >> def setSubscriptionName(value: String): Unit
   >>
   >> def getDatasetName: String
   >> def setDatasetName(value: String): Unit
   >>
   >> }
   >>
   >> import java.io.{InputStream, OutputStream}
   >>
   >> import com.google.api.services.bigquery.model._
   >> import org.apache.beam.sdk.coders._
   >> import org.apache.beam.sdk.transforms.SerializableFunction
   >> import org.joda.time.Instant
   >> import org.joda.time.format.ISODateTimeFormat
   >>
   >> case class Data[T](deviceType: String, timestamp: Instant, value: T,
   >> measurement: String, deviceId: Long) {
   >>
   >> def forDeviceMetricsRaw: TableRow = new TableRow().set(
   >>   "device_id", deviceId
   >> ).set(
   >>   "timestamp", ISODateTimeFormat.dateTime().print(timestamp)
   >> ).set(
   >>   "value", value
   >> )
   >>
   >> def forIotTimeSeriesRaw: TableRow = new TableRow().set(
   >>   "device_uuid", s"${deviceType}_$deviceId"
   >> ).set(
   >>   "measurement", measurement
   >> ).set(
   >>   "timestamp", ISODateTimeFormat.dateTime().print(timestamp)
   >> ).set(
   >>   "value", value
   >> )
   >>
   >> }
   >>
   >> object Data {
   >> val coder: AtomicCoder[Data[Double]] = new AtomicCoder[Data[Double]] {
   >> private val stringCoder: StringUtf8Coder = StringUtf8Coder.of()
   >> private val instantCoder: InstantCoder = InstantCoder.of()
   >> private val doubleCoder: DoubleCoder = DoubleCoder.of()
   >> private val longCoder: VarLongCoder = VarLongCoder.of()
   >>
   >>   def encode(value: Data[Double], os: OutputStream): Unit = {
   >>     stringCoder.encode(value.deviceType, os)
   >>     instantCoder.encode(value.timestamp, os)
   >>     doubleCoder.encode(value.value, os)
   >>     stringCoder.encode(value.measurement, os)
   >>     longCoder.encode(value.deviceId, os)
   >>   }
   >>   def decode(is: InputStream): Data[Double] = {
   >>     val deviceType = stringCoder.decode(is)
   >>     val timestamp = instantCoder.decode(is)
   >>     val value = doubleCoder.decode(is)
   >>     val measurement = stringCoder.decode(is)
   >>     val deviceId = longCoder.decode(is)
   >>     Data(deviceType, timestamp, value, measurement, deviceId)
   >>   }
   >> }
   >>
   >> class DataForDeviceMetricsRaw[T] extends SerializableFunction[Data[T], 
TableRow] {
   >>   override def apply(input: Data[T]): TableRow = input.forDeviceMetricsRaw
   >> }
   >>
   >> class DataForIotTimeSeriesRaw[T] extends SerializableFunction[Data[T], 
TableRow] {
   >>   override def apply(input: Data[T]): TableRow = input.forIotTimeSeriesRaw
   >> }
   >>
   >> }
   >>
   >> import java.util.function.Predicate
   >> import java.util.regex.Pattern
   >> import java.{lang, util}
   >>
   >> import com.awair.iot.timeseries.datapipeline
   >> import com.awair.iot.timeseries.v1beta1.Timeseries.{Downsampler,
   >> TimeSeries}
   >> import org.apache.beam.sdk.transforms.{InferableFunction, ProcessFunction}
   >> import org.joda.time.Instant
   >>
   >> object Timeseries {
   >>
   >> class IsValidTimeseries extends ProcessFunction[TimeSeries, lang.Boolean] 
{
   >>   val deviceUuidHasValidCharactersPattern: Pattern = 
Pattern.compile("^[a-zA-Z0-9\\-\\.]+_[0-9]+$")
   >>   val measurementHasValidCharactersPattern: Pattern = 
Pattern.compile("^[a-zA-Z0-9_\\-\\.]+$")
   >>   override def apply(input: TimeSeries): lang.Boolean = {
   >>     val deviceUuidIsNonNull: Predicate[lang.String] = (t: lang.String) => 
t != null
   >>     val deviceUuidHasValidCharacters: Predicate[String] = 
deviceUuidHasValidCharactersPattern.asPredicate()
   >>     val validateDeviceUuid: Predicate[lang.String] = deviceUuidIsNonNull 
and deviceUuidHasValidCharacters
   >>
   >>     val measurementIsNonNull: Predicate[lang.String] = (t: lang.String) 
=> t != null
   >>     val measurementHasValidCharacters: Predicate[lang.String] = 
measurementHasValidCharactersPattern.asPredicate()
   >>     val validateMeasurement: Predicate[lang.String] = 
measurementIsNonNull and measurementHasValidCharacters
   >>
   >>     validateDeviceUuid.test(input.getDeviceUuid) && 
validateMeasurement.test(input.getMeasurement)
   >>   }
   >> }
   >>
   >> class IsRawTimeseries extends ProcessFunction[TimeSeries, lang.Boolean] {
   >>   override def apply(input: TimeSeries): lang.Boolean = {
   >>     input.getDownsampler == Downsampler.getDefaultInstance
   >>   }
   >> }
   >>
   >> class TimeseriesToIterableData extends InferableFunction[TimeSeries, 
lang.Iterable[Data[Double]]] {
   >>   override def apply(input: TimeSeries): lang.Iterable[Data[Double]] = {
   >>     val deviceTypeAndId = input.getDeviceUuid.split("_")
   >>     val it = input
   >>       .getDataPointsList
   >>       .stream
   >>       .map[Data[Double]] { dp =>
   >>         Data(
   >>           deviceTypeAndId(0),
   >>           new Instant(dp.getTimestamp),
   >>           dp.getValue,
   >>           input.getMeasurement,
   >>           deviceTypeAndId(1).toLong
   >>         )
   >>       }.iterator()
   >>     new lang.Iterable[Data[Double]] {
   >>       override def iterator(): util.Iterator[Data[Double]] = it
   >>     }
   >>   }
   >> }
   >>
   >> }
   >>
   >> import java.util.regex.Pattern
   >>
   >> import com.google.api.services.bigquery.model.{Clustering,
   >> TableReference, TimePartitioning}
   >> import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryHelpers,
   >> TableDestination}
   >> import org.apache.beam.sdk.transforms.SerializableFunction
   >> import org.apache.beam.sdk.values.ValueInSingleWindow
   >>
   >> class BigqueryDestinationFunc(project: String, datasetName: String)
   >> extends SerializableFunction[ValueInSingleWindow[Data[Double]],
   >> TableDestination] {
   >> private val validTableNamePattern = Pattern.compile("[^a-zA-Z0-9]")
   >> override def apply(input: ValueInSingleWindow[Data[Double]]):
   >> TableDestination = {
   >> new TableDestination(
   >> BigQueryHelpers.toTableSpec(
   >> new TableReference().setProjectId(
   >> project
   >> ).setDatasetId(
   >> datasetName
   >> ).setTableId(
   >>
   >> 
s"${validTableNamePattern.matcher(input.getValue.measurement).replaceAll("").toLowerCase}_${validTableNamePattern.matcher(input.getValue.deviceType).replaceAll("").toLowerCase}"
   >> )
   >> ),
   >> "",
   >> new
   >> 
TimePartitioning().setRequirePartitionFilter(true).setField("timestamp").setType("DAY"),
   >> new Clustering().setFields(java.util.Arrays.asList("device_id"))
   >> )
   >> }
   >> }
   >>
   >> def main(args: Array[String]): Unit = {
   >>
   >> val options = PipelineOptionsFactory
   >>   .fromArgs(args: _*)
   >>   .withValidation()
   >>   .as(classOf[PipelineOptions])
   >>
   >> val pipeline = Pipeline.create(options)
   >> val project = options.getProject
   >> val subscriptionName = options.getSubscriptionName
   >> val datasetName = options.getDatasetName
   >>
   >> val rows = pipeline.apply(
   >>   "read from pubsub",
   >>   PubsubIO.readProtos(
   >>     classOf[TimeSeries]
   >>   ).fromSubscription(
   >>     s"projects/$project/subscriptions/$subscriptionName"
   >>   )
   >> ).apply(
   >>   "filter for raw",
   >>   Filter.by(new datapipeline.Timeseries.IsRawTimeseries)
   >> ).apply(
   >>   "filter for valid",
   >>   Filter.by(new datapipeline.Timeseries.IsValidTimeseries)
   >> ).apply(
   >>   "flatmap into rows",
   >>   FlatMapElements.via(new 
datapipeline.Timeseries.TimeseriesToIterableData)
   >> ).setCoder(Data.coder).apply(
   >>   "write to bigquery",
   >>   BigQueryIO.write(
   >>   ).withNumFileShards(
   >>     8
   >>   ).withFormatFunction(
   >>     new Data.DataForDeviceMetricsRaw[Double]
   >>   ).withSchema(
   >>     BigQueryUtils.toTableSchema(
   >>       Schema.of(
   >>         Schema.Field.of("device_id", Schema.FieldType.INT64),
   >>         Schema.Field.of("timestamp", Schema.FieldType.DATETIME),
   >>         Schema.Field.of("value", Schema.FieldType.DOUBLE)
   >>       )
   >>     )
   >>   ).to(
   >>     new BigqueryDestinationFunc(project, datasetName)
   >>   ).withClustering(
   >>   ).optimizedWrites(
   >>   ).withWriteDisposition(
   >>     WriteDisposition.WRITE_APPEND
   >>   ).withCreateDisposition(
   >>     CreateDisposition.CREATE_IF_NEEDED
   >>   ).withMethod(
   >>     BigQueryIO.Write.Method.FILE_LOADS
   >>   ).withTriggeringFrequency(
   >>     Duration.standardMinutes(5)
   >>   )
   >> )
   >>
   >> pipeline.run()
   >>
   >> /*
   >> deployment as:
   >>
   >> sbt "runMain 
com.awair.iot.timeseries.datapipeline.bigquery.write.devicemetricsraw.DeviceMetricsRaw\
   >> --maxNumWorkers=4\
   >> --numWorkers=4\
   >> --workerMachineType=n1-standard-4\
   >> --runner=DataflowRunner\
   >> --project=awair-prd\
   >> --jobName=iot-time-series-datapipeline\
   >> --subscriptionName=iot-time-series-datapipeline\
   >> --datasetName=device_metrics_raw\
   >> --region=us-central1\
   >> --autoscalingAlgorithm=THROUGHPUT_BASED\
   >> --workerDiskType=compute.googleapis.com/projects//zones//diskTypes/pd-ssd
   >> --diskSizeGb=100\
   >> --update 
<http://compute.googleapis.com/projects//zones//diskTypes/pd-ssd--diskSizeGb=100%5C--update>"
   >>  */
   >>
   >> }
   >> }
   >> `
   >>
   >> Protobuf to generate Timeseries type (java)
   >> `
   >> syntax = "proto3";
   >>
   >> package awair.iot.timeseries.v1beta1;
   >> option java_package = "com.awair.iot.timeseries.v1beta1";
   >>
   >> import "google/protobuf/empty.proto";
   >>
   >> message TimeSeries {
   >> string device_uuid = 1;
   >> string measurement = 2;
   >> repeated DataPoint data_points = 3;
   >> Downsampler downsampler = 4;
   >> }
   >>
   >> message DataPoint {
   >> int64 timestamp = 1;
   >> double value = 2;
   >> }
   >>
   >> message Downsampler {
   >>
   >> enum Method {
   >>     MAX = 0;
   >>     MIN = 1;
   >>     COUNT = 2;
   >>     SUM = 3;
   >>     AVG = 4;
   >> }
   >>
   >> Method method = 1;
   >> int32 interval_seconds = 2;
   >>
   >> }
   >>
   >> service IotTimeSeriesService {
   >> rpc GetTimeSeries (GetTimeSeriesRequest) returns (TimeSeries);
   >> }
   >>
   >> message GetTimeSeriesRequest {
   >> string device_uuid = 1;
   >> string measurement = 2;
   >> Downsampler downsampler = 3;
   >> int64 start_time = 4;
   >> int64 end_time = 5;
   >> int64 max_data_points = 6;
   >> }
   >> `
   >>
   >> If you want to do a live debugging in this project I can see about
   >> arranging temporary access to our backend so that you can
   >>
   >> —
   >> You are receiving this because you were mentioned.
   >> Reply to this email directly, view it on GitHub
   >> 
<https://github.com/apache/beam/pull/9556?email_source=notifications&email_token=AFAYJVNJ7FXWZEPTPJGUUYTQKJZUVA5CNFSM4IWBC3BKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD7BCEPY#issuecomment-532816447>,
   >> or mute the thread
   >> 
<https://github.com/notifications/unsubscribe-auth/AFAYJVMMNV3JA6UORRF7QRDQKJZUVANCNFSM4IWBC3BA>
   >> .
   >>
   >
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 314580)
    Time Spent: 5h 50m  (was: 5h 40m)

> Streaming Pipeline throws RuntimeException when using DynamicDestinations and 
> Method.FILE_LOADS
> -----------------------------------------------------------------------------------------------
>
>                 Key: BEAM-3200
>                 URL: https://issues.apache.org/jira/browse/BEAM-3200
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.2.0
>            Reporter: AJ
>            Priority: Critical
>          Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> I am trying to use Method.FILE_LOADS for loading data into BQ in my streaming 
> pipeline using RC3 release of 2.2.0. I am writing to around 500 tables using 
> DynamicDestinations and I am also using 
> withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED). Everything works 
> fine when the first time bigquery load jobs get triggered. But on subsequent 
> triggers pipeline throws a RuntimeException about table not found even though 
> I created the pipeline with CreateDisposition.CREATE_IF_NEEDED. The exact 
> exception is:
> {code}
> java.lang.RuntimeException: Failed to create load job with id prefix 
> 717aed9ed1ef4aa7a616e1132f8b7f6d_a0928cae3d670b32f01ab2d9fe5cc0ee_00001_00001,
>  reached max retries: 3, last failed load job: {
>   "configuration" : {
>     "load" : {
>       "createDisposition" : "CREATE_NEVER",
>       "destinationTable" : {
>         "datasetId" : ...,
>         "projectId" : ...,
>         "tableId" : ....
>       },
>     "errors" : [ }
>       "message" : "Not found: Table ....,
>       "reason" : "notFound"
>     } ],
> {code}
> My theory is all the subsequent load jobs get trigged using CREATE_NEVER 
> disposition and 
> this might be due to 
> https://github.com/apache/beam/blob/release-2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L140
> When using DynamicDestinations all the destination tables might not be known 
> during the first trigger and hence the pipeline's create disposition should 
> be respected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to