[
https://issues.apache.org/jira/browse/BEAM-3200?focusedWorklogId=314572&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-314572
]
ASF GitHub Bot logged work on BEAM-3200:
----------------------------------------
Author: ASF GitHub Bot
Created on: 18/Sep/19 18:52
Start Date: 18/Sep/19 18:52
Worklog Time Spent: 10m
Work Description: bcbradle commented on issue #9556: [BEAM-3200,
BEAM-3772] Fix: Honor create and write disposition with dynamic destinations
URL: https://github.com/apache/beam/pull/9556#issuecomment-532816447
Data pipeline that (eventually) produces the erroneous behavior
`package
com.awair.iot.timeseries.datapipeline.bigquery.write.devicemetricsraw
import org.apache.beam.sdk.Pipeline
import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{CreateDisposition,
WriteDisposition}
import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryIO, BigQueryUtils}
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.schemas.Schema
import org.apache.beam.sdk.transforms.{Filter, FlatMapElements}
import org.joda.time.Duration
object DeviceMetricsRaw {
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
trait PipelineOptions extends DataflowPipelineOptions {
def getSubscriptionName: String
def setSubscriptionName(value: String): Unit
def getDatasetName: String
def setDatasetName(value: String): Unit
}
import java.io.{InputStream, OutputStream}
import com.google.api.services.bigquery.model._
import org.apache.beam.sdk.coders._
import org.apache.beam.sdk.transforms.SerializableFunction
import org.joda.time.Instant
import org.joda.time.format.ISODateTimeFormat
case class Data[T](deviceType: String, timestamp: Instant, value: T,
measurement: String, deviceId: Long) {
def forDeviceMetricsRaw: TableRow = new TableRow().set(
"device_id", deviceId
).set(
"timestamp", ISODateTimeFormat.dateTime().print(timestamp)
).set(
"value", value
)
def forIotTimeSeriesRaw: TableRow = new TableRow().set(
"device_uuid", s"${deviceType}_$deviceId"
).set(
"measurement", measurement
).set(
"timestamp", ISODateTimeFormat.dateTime().print(timestamp)
).set(
"value", value
)
}
object Data {
val coder: AtomicCoder[Data[Double]] = new AtomicCoder[Data[Double]] {
private val stringCoder: StringUtf8Coder = StringUtf8Coder.of()
private val instantCoder: InstantCoder = InstantCoder.of()
private val doubleCoder: DoubleCoder = DoubleCoder.of()
private val longCoder: VarLongCoder = VarLongCoder.of()
def encode(value: Data[Double], os: OutputStream): Unit = {
stringCoder.encode(value.deviceType, os)
instantCoder.encode(value.timestamp, os)
doubleCoder.encode(value.value, os)
stringCoder.encode(value.measurement, os)
longCoder.encode(value.deviceId, os)
}
def decode(is: InputStream): Data[Double] = {
val deviceType = stringCoder.decode(is)
val timestamp = instantCoder.decode(is)
val value = doubleCoder.decode(is)
val measurement = stringCoder.decode(is)
val deviceId = longCoder.decode(is)
Data(deviceType, timestamp, value, measurement, deviceId)
}
}
class DataForDeviceMetricsRaw[T] extends SerializableFunction[Data[T],
TableRow] {
override def apply(input: Data[T]): TableRow =
input.forDeviceMetricsRaw
}
class DataForIotTimeSeriesRaw[T] extends SerializableFunction[Data[T],
TableRow] {
override def apply(input: Data[T]): TableRow =
input.forIotTimeSeriesRaw
}
}
import java.util.function.Predicate
import java.util.regex.Pattern
import java.{lang, util}
import com.awair.iot.timeseries.datapipeline
import com.awair.iot.timeseries.v1beta1.Timeseries.{Downsampler,
TimeSeries}
import org.apache.beam.sdk.transforms.{InferableFunction, ProcessFunction}
import org.joda.time.Instant
object Timeseries {
class IsValidTimeseries extends ProcessFunction[TimeSeries,
lang.Boolean] {
val deviceUuidHasValidCharactersPattern: Pattern =
Pattern.compile("^[a-zA-Z0-9\\-\\.]+_[0-9]+$")
val measurementHasValidCharactersPattern: Pattern =
Pattern.compile("^[a-zA-Z0-9_\\-\\.]+$")
override def apply(input: TimeSeries): lang.Boolean = {
val deviceUuidIsNonNull: Predicate[lang.String] = (t: lang.String)
=> t != null
val deviceUuidHasValidCharacters: Predicate[String] =
deviceUuidHasValidCharactersPattern.asPredicate()
val validateDeviceUuid: Predicate[lang.String] = deviceUuidIsNonNull
and deviceUuidHasValidCharacters
val measurementIsNonNull: Predicate[lang.String] = (t: lang.String)
=> t != null
val measurementHasValidCharacters: Predicate[lang.String] =
measurementHasValidCharactersPattern.asPredicate()
val validateMeasurement: Predicate[lang.String] =
measurementIsNonNull and measurementHasValidCharacters
validateDeviceUuid.test(input.getDeviceUuid) &&
validateMeasurement.test(input.getMeasurement)
}
}
class IsRawTimeseries extends ProcessFunction[TimeSeries, lang.Boolean] {
override def apply(input: TimeSeries): lang.Boolean = {
input.getDownsampler == Downsampler.getDefaultInstance
}
}
class TimeseriesToIterableData extends InferableFunction[TimeSeries,
lang.Iterable[Data[Double]]] {
override def apply(input: TimeSeries): lang.Iterable[Data[Double]] = {
val deviceTypeAndId = input.getDeviceUuid.split("_")
val it = input
.getDataPointsList
.stream
.map[Data[Double]] { dp =>
Data(
deviceTypeAndId(0),
new Instant(dp.getTimestamp),
dp.getValue,
input.getMeasurement,
deviceTypeAndId(1).toLong
)
}.iterator()
new lang.Iterable[Data[Double]] {
override def iterator(): util.Iterator[Data[Double]] = it
}
}
}
}
import java.util.regex.Pattern
import com.google.api.services.bigquery.model.{Clustering, TableReference,
TimePartitioning}
import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryHelpers,
TableDestination}
import org.apache.beam.sdk.transforms.SerializableFunction
import org.apache.beam.sdk.values.ValueInSingleWindow
class BigqueryDestinationFunc(project: String, datasetName: String)
extends SerializableFunction[ValueInSingleWindow[Data[Double]],
TableDestination] {
private val validTableNamePattern = Pattern.compile("[^a-zA-Z0-9]")
override def apply(input: ValueInSingleWindow[Data[Double]]):
TableDestination = {
new TableDestination(
BigQueryHelpers.toTableSpec(
new TableReference().setProjectId(
project
).setDatasetId(
datasetName
).setTableId(
s"${validTableNamePattern.matcher(input.getValue.measurement).replaceAll("").toLowerCase}_${validTableNamePattern.matcher(input.getValue.deviceType).replaceAll("").toLowerCase}"
)
),
"",
new
TimePartitioning().setRequirePartitionFilter(true).setField("timestamp").setType("DAY"),
new Clustering().setFields(java.util.Arrays.asList("device_id"))
)
}
}
def main(args: Array[String]): Unit = {
val options = PipelineOptionsFactory
.fromArgs(args: _*)
.withValidation()
.as(classOf[PipelineOptions])
val pipeline = Pipeline.create(options)
val project = options.getProject
val subscriptionName = options.getSubscriptionName
val datasetName = options.getDatasetName
val rows = pipeline.apply(
"read from pubsub",
PubsubIO.readProtos(
classOf[TimeSeries]
).fromSubscription(
s"projects/$project/subscriptions/$subscriptionName"
)
).apply(
"filter for raw",
Filter.by(new datapipeline.Timeseries.IsRawTimeseries)
).apply(
"filter for valid",
Filter.by(new datapipeline.Timeseries.IsValidTimeseries)
).apply(
"flatmap into rows",
FlatMapElements.via(new
datapipeline.Timeseries.TimeseriesToIterableData)
).setCoder(Data.coder).apply(
"write to bigquery",
BigQueryIO.write(
).withNumFileShards(
8
).withFormatFunction(
new Data.DataForDeviceMetricsRaw[Double]
).withSchema(
BigQueryUtils.toTableSchema(
Schema.of(
Schema.Field.of("device_id", Schema.FieldType.INT64),
Schema.Field.of("timestamp", Schema.FieldType.DATETIME),
Schema.Field.of("value", Schema.FieldType.DOUBLE)
)
)
).to(
new BigqueryDestinationFunc(project, datasetName)
).withClustering(
).optimizedWrites(
).withWriteDisposition(
WriteDisposition.WRITE_APPEND
).withCreateDisposition(
CreateDisposition.CREATE_IF_NEEDED
).withMethod(
BigQueryIO.Write.Method.FILE_LOADS
).withTriggeringFrequency(
Duration.standardMinutes(5)
)
)
pipeline.run()
/*
deployment as:
sbt "runMain
com.awair.iot.timeseries.datapipeline.bigquery.write.devicemetricsraw.DeviceMetricsRaw\
--maxNumWorkers=4\
--numWorkers=4\
--workerMachineType=n1-standard-4\
--runner=DataflowRunner\
--project=awair-prd\
--jobName=iot-time-series-datapipeline\
--subscriptionName=iot-time-series-datapipeline\
--datasetName=device_metrics_raw\
--region=us-central1\
--autoscalingAlgorithm=THROUGHPUT_BASED\
--workerDiskType=compute.googleapis.com/projects//zones//diskTypes/pd-ssd
--diskSizeGb=100\
--update"
*/
}
}
`
Protobuf to generate Timeseries type (java)
`syntax = "proto3";
package awair.iot.timeseries.v1beta1;
option java_package = "com.awair.iot.timeseries.v1beta1";
import "google/protobuf/empty.proto";
message TimeSeries {
string device_uuid = 1;
string measurement = 2;
repeated DataPoint data_points = 3;
Downsampler downsampler = 4;
}
message DataPoint {
int64 timestamp = 1;
double value = 2;
}
message Downsampler {
enum Method {
MAX = 0;
MIN = 1;
COUNT = 2;
SUM = 3;
AVG = 4;
}
Method method = 1;
int32 interval_seconds = 2;
}
service IotTimeSeriesService {
rpc GetTimeSeries (GetTimeSeriesRequest) returns (TimeSeries);
}
message GetTimeSeriesRequest {
string device_uuid = 1;
string measurement = 2;
Downsampler downsampler = 3;
int64 start_time = 4;
int64 end_time = 5;
int64 max_data_points = 6;
}`
If you want to do a live debugging in this project I can see about arranging
temporary access to our backend so that you can
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 314572)
Time Spent: 4h 40m (was: 4.5h)
> Streaming Pipeline throws RuntimeException when using DynamicDestinations and
> Method.FILE_LOADS
> -----------------------------------------------------------------------------------------------
>
> Key: BEAM-3200
> URL: https://issues.apache.org/jira/browse/BEAM-3200
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Affects Versions: 2.2.0
> Reporter: AJ
> Priority: Critical
> Time Spent: 4h 40m
> Remaining Estimate: 0h
>
> I am trying to use Method.FILE_LOADS for loading data into BQ in my streaming
> pipeline using RC3 release of 2.2.0. I am writing to around 500 tables using
> DynamicDestinations and I am also using
> withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED). Everything works
> fine when the first time bigquery load jobs get triggered. But on subsequent
> triggers pipeline throws a RuntimeException about table not found even though
> I created the pipeline with CreateDisposition.CREATE_IF_NEEDED. The exact
> exception is:
> {code}
> java.lang.RuntimeException: Failed to create load job with id prefix
> 717aed9ed1ef4aa7a616e1132f8b7f6d_a0928cae3d670b32f01ab2d9fe5cc0ee_00001_00001,
> reached max retries: 3, last failed load job: {
> "configuration" : {
> "load" : {
> "createDisposition" : "CREATE_NEVER",
> "destinationTable" : {
> "datasetId" : ...,
> "projectId" : ...,
> "tableId" : ....
> },
> "errors" : [ }
> "message" : "Not found: Table ....,
> "reason" : "notFound"
> } ],
> {code}
> My theory is all the subsequent load jobs get trigged using CREATE_NEVER
> disposition and
> this might be due to
> https://github.com/apache/beam/blob/release-2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L140
> When using DynamicDestinations all the destination tables might not be known
> during the first trigger and hence the pipeline's create disposition should
> be respected.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)