[ https://issues.apache.org/jira/browse/BEAM-3200?focusedWorklogId=314577&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-314577 ]
ASF GitHub Bot logged work on BEAM-3200: ---------------------------------------- Author: ASF GitHub Bot Created on: 18/Sep/19 18:56 Start Date: 18/Sep/19 18:56 Worklog Time Spent: 10m Work Description: bcbradle commented on issue #9556: [BEAM-3200, BEAM-3772] Fix: Honor create and write disposition with dynamic destinations URL: https://github.com/apache/beam/pull/9556#issuecomment-532816447 ``` Data pipeline that (eventually) produces the erroneous behavior package com.awair.iot.timeseries.datapipeline.bigquery.write.devicemetricsraw import org.apache.beam.sdk.Pipeline import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{CreateDisposition, WriteDisposition} import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryIO, BigQueryUtils} import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO import org.apache.beam.sdk.options.PipelineOptionsFactory import org.apache.beam.sdk.schemas.Schema import org.apache.beam.sdk.transforms.{Filter, FlatMapElements} import org.joda.time.Duration object DeviceMetricsRaw { import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions trait PipelineOptions extends DataflowPipelineOptions { def getSubscriptionName: String def setSubscriptionName(value: String): Unit def getDatasetName: String def setDatasetName(value: String): Unit } import java.io.{InputStream, OutputStream} import com.google.api.services.bigquery.model._ import org.apache.beam.sdk.coders._ import org.apache.beam.sdk.transforms.SerializableFunction import org.joda.time.Instant import org.joda.time.format.ISODateTimeFormat case class Data[T](deviceType: String, timestamp: Instant, value: T, measurement: String, deviceId: Long) { def forDeviceMetricsRaw: TableRow = new TableRow().set( "device_id", deviceId ).set( "timestamp", ISODateTimeFormat.dateTime().print(timestamp) ).set( "value", value ) def forIotTimeSeriesRaw: TableRow = new TableRow().set( "device_uuid", s"${deviceType}_$deviceId" ).set( "measurement", measurement ).set( "timestamp", ISODateTimeFormat.dateTime().print(timestamp) ).set( "value", value ) } object Data { val coder: AtomicCoder[Data[Double]] = new AtomicCoder[Data[Double]] { private val stringCoder: StringUtf8Coder = StringUtf8Coder.of() private val instantCoder: InstantCoder = InstantCoder.of() private val doubleCoder: DoubleCoder = DoubleCoder.of() private val longCoder: VarLongCoder = VarLongCoder.of() def encode(value: Data[Double], os: OutputStream): Unit = { stringCoder.encode(value.deviceType, os) instantCoder.encode(value.timestamp, os) doubleCoder.encode(value.value, os) stringCoder.encode(value.measurement, os) longCoder.encode(value.deviceId, os) } def decode(is: InputStream): Data[Double] = { val deviceType = stringCoder.decode(is) val timestamp = instantCoder.decode(is) val value = doubleCoder.decode(is) val measurement = stringCoder.decode(is) val deviceId = longCoder.decode(is) Data(deviceType, timestamp, value, measurement, deviceId) } } class DataForDeviceMetricsRaw[T] extends SerializableFunction[Data[T], TableRow] { override def apply(input: Data[T]): TableRow = input.forDeviceMetricsRaw } class DataForIotTimeSeriesRaw[T] extends SerializableFunction[Data[T], TableRow] { override def apply(input: Data[T]): TableRow = input.forIotTimeSeriesRaw } } import java.util.function.Predicate import java.util.regex.Pattern import java.{lang, util} import com.awair.iot.timeseries.datapipeline import com.awair.iot.timeseries.v1beta1.Timeseries.{Downsampler, TimeSeries} import org.apache.beam.sdk.transforms.{InferableFunction, ProcessFunction} import org.joda.time.Instant object Timeseries { class IsValidTimeseries extends ProcessFunction[TimeSeries, lang.Boolean] { val deviceUuidHasValidCharactersPattern: Pattern = Pattern.compile("^[a-zA-Z0-9\\-\\.]+_[0-9]+$") val measurementHasValidCharactersPattern: Pattern = Pattern.compile("^[a-zA-Z0-9_\\-\\.]+$") override def apply(input: TimeSeries): lang.Boolean = { val deviceUuidIsNonNull: Predicate[lang.String] = (t: lang.String) => t != null val deviceUuidHasValidCharacters: Predicate[String] = deviceUuidHasValidCharactersPattern.asPredicate() val validateDeviceUuid: Predicate[lang.String] = deviceUuidIsNonNull and deviceUuidHasValidCharacters val measurementIsNonNull: Predicate[lang.String] = (t: lang.String) => t != null val measurementHasValidCharacters: Predicate[lang.String] = measurementHasValidCharactersPattern.asPredicate() val validateMeasurement: Predicate[lang.String] = measurementIsNonNull and measurementHasValidCharacters validateDeviceUuid.test(input.getDeviceUuid) && validateMeasurement.test(input.getMeasurement) } } class IsRawTimeseries extends ProcessFunction[TimeSeries, lang.Boolean] { override def apply(input: TimeSeries): lang.Boolean = { input.getDownsampler == Downsampler.getDefaultInstance } } class TimeseriesToIterableData extends InferableFunction[TimeSeries, lang.Iterable[Data[Double]]] { override def apply(input: TimeSeries): lang.Iterable[Data[Double]] = { val deviceTypeAndId = input.getDeviceUuid.split("_") val it = input .getDataPointsList .stream .map[Data[Double]] { dp => Data( deviceTypeAndId(0), new Instant(dp.getTimestamp), dp.getValue, input.getMeasurement, deviceTypeAndId(1).toLong ) }.iterator() new lang.Iterable[Data[Double]] { override def iterator(): util.Iterator[Data[Double]] = it } } } } import java.util.regex.Pattern import com.google.api.services.bigquery.model.{Clustering, TableReference, TimePartitioning} import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryHelpers, TableDestination} import org.apache.beam.sdk.transforms.SerializableFunction import org.apache.beam.sdk.values.ValueInSingleWindow class BigqueryDestinationFunc(project: String, datasetName: String) extends SerializableFunction[ValueInSingleWindow[Data[Double]], TableDestination] { private val validTableNamePattern = Pattern.compile("[^a-zA-Z0-9]") override def apply(input: ValueInSingleWindow[Data[Double]]): TableDestination = { new TableDestination( BigQueryHelpers.toTableSpec( new TableReference().setProjectId( project ).setDatasetId( datasetName ).setTableId( s"${validTableNamePattern.matcher(input.getValue.measurement).replaceAll("").toLowerCase}_${validTableNamePattern.matcher(input.getValue.deviceType).replaceAll("").toLowerCase}" ) ), "", new TimePartitioning().setRequirePartitionFilter(true).setField("timestamp").setType("DAY"), new Clustering().setFields(java.util.Arrays.asList("device_id")) ) } } def main(args: Array[String]): Unit = { val options = PipelineOptionsFactory .fromArgs(args: _*) .withValidation() .as(classOf[PipelineOptions]) val pipeline = Pipeline.create(options) val project = options.getProject val subscriptionName = options.getSubscriptionName val datasetName = options.getDatasetName val rows = pipeline.apply( "read from pubsub", PubsubIO.readProtos( classOf[TimeSeries] ).fromSubscription( s"projects/$project/subscriptions/$subscriptionName" ) ).apply( "filter for raw", Filter.by(new datapipeline.Timeseries.IsRawTimeseries) ).apply( "filter for valid", Filter.by(new datapipeline.Timeseries.IsValidTimeseries) ).apply( "flatmap into rows", FlatMapElements.via(new datapipeline.Timeseries.TimeseriesToIterableData) ).setCoder(Data.coder).apply( "write to bigquery", BigQueryIO.write( ).withNumFileShards( 8 ).withFormatFunction( new Data.DataForDeviceMetricsRaw[Double] ).withSchema( BigQueryUtils.toTableSchema( Schema.of( Schema.Field.of("device_id", Schema.FieldType.INT64), Schema.Field.of("timestamp", Schema.FieldType.DATETIME), Schema.Field.of("value", Schema.FieldType.DOUBLE) ) ) ).to( new BigqueryDestinationFunc(project, datasetName) ).withClustering( ).optimizedWrites( ).withWriteDisposition( WriteDisposition.WRITE_APPEND ).withCreateDisposition( CreateDisposition.CREATE_IF_NEEDED ).withMethod( BigQueryIO.Write.Method.FILE_LOADS ).withTriggeringFrequency( Duration.standardMinutes(5) ) ) pipeline.run() /* deployment as: sbt "runMain com.awair.iot.timeseries.datapipeline.bigquery.write.devicemetricsraw.DeviceMetricsRaw\ --maxNumWorkers=4\ --numWorkers=4\ --workerMachineType=n1-standard-4\ --runner=DataflowRunner\ --project=awair-prd\ --jobName=iot-time-series-datapipeline\ --subscriptionName=iot-time-series-datapipeline\ --datasetName=device_metrics_raw\ --region=us-central1\ --autoscalingAlgorithm=THROUGHPUT_BASED\ --workerDiskType=compute.googleapis.com/projects//zones//diskTypes/pd-ssd --diskSizeGb=100\ --update" */ } } ``` Protobuf to generate Timeseries type (java) ``` syntax = "proto3"; package awair.iot.timeseries.v1beta1; option java_package = "com.awair.iot.timeseries.v1beta1"; import "google/protobuf/empty.proto"; message TimeSeries { string device_uuid = 1; string measurement = 2; repeated DataPoint data_points = 3; Downsampler downsampler = 4; } message DataPoint { int64 timestamp = 1; double value = 2; } message Downsampler { enum Method { MAX = 0; MIN = 1; COUNT = 2; SUM = 3; AVG = 4; } Method method = 1; int32 interval_seconds = 2; } service IotTimeSeriesService { rpc GetTimeSeries (GetTimeSeriesRequest) returns (TimeSeries); } message GetTimeSeriesRequest { string device_uuid = 1; string measurement = 2; Downsampler downsampler = 3; int64 start_time = 4; int64 end_time = 5; int64 max_data_points = 6; } ``` If you want to do a live debugging in this project I can see about arranging temporary access to our backend so that you can ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 314577) Time Spent: 5.5h (was: 5h 20m) > Streaming Pipeline throws RuntimeException when using DynamicDestinations and > Method.FILE_LOADS > ----------------------------------------------------------------------------------------------- > > Key: BEAM-3200 > URL: https://issues.apache.org/jira/browse/BEAM-3200 > Project: Beam > Issue Type: Bug > Components: io-java-gcp > Affects Versions: 2.2.0 > Reporter: AJ > Priority: Critical > Time Spent: 5.5h > Remaining Estimate: 0h > > I am trying to use Method.FILE_LOADS for loading data into BQ in my streaming > pipeline using RC3 release of 2.2.0. I am writing to around 500 tables using > DynamicDestinations and I am also using > withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED). Everything works > fine when the first time bigquery load jobs get triggered. But on subsequent > triggers pipeline throws a RuntimeException about table not found even though > I created the pipeline with CreateDisposition.CREATE_IF_NEEDED. The exact > exception is: > {code} > java.lang.RuntimeException: Failed to create load job with id prefix > 717aed9ed1ef4aa7a616e1132f8b7f6d_a0928cae3d670b32f01ab2d9fe5cc0ee_00001_00001, > reached max retries: 3, last failed load job: { > "configuration" : { > "load" : { > "createDisposition" : "CREATE_NEVER", > "destinationTable" : { > "datasetId" : ..., > "projectId" : ..., > "tableId" : .... > }, > "errors" : [ } > "message" : "Not found: Table ...., > "reason" : "notFound" > } ], > {code} > My theory is all the subsequent load jobs get trigged using CREATE_NEVER > disposition and > this might be due to > https://github.com/apache/beam/blob/release-2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L140 > When using DynamicDestinations all the destination tables might not be known > during the first trigger and hence the pipeline's create disposition should > be respected. -- This message was sent by Atlassian Jira (v8.3.4#803005)