chamikaramj commented on code in PR #29731:
URL: https://github.com/apache/beam/pull/29731#discussion_r1425672058
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java:
##########
@@ -184,105 +189,124 @@ public Row toConfigRow(TypedRead<?> transform) {
@Override
public TypedRead<?> fromConfigRow(Row configRow) {
- BigQueryIO.TypedRead.Builder builder = new
AutoValue_BigQueryIO_TypedRead.Builder<>();
+ try {
+ BigQueryIO.TypedRead.Builder builder = new
AutoValue_BigQueryIO_TypedRead.Builder<>();
- String jsonTableRef = configRow.getString("json_table_ref");
- if (jsonTableRef != null) {
- builder =
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
- }
- String query = configRow.getString("query");
- if (query != null) {
- builder = builder.setQuery(StaticValueProvider.of(query));
- }
- Boolean validate = configRow.getBoolean("validate");
- if (validate != null) {
- builder = builder.setValidate(validate);
- }
- Boolean flattenResults = configRow.getBoolean("flatten_results");
- if (flattenResults != null) {
- builder = builder.setFlattenResults(flattenResults);
- }
- Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
- if (useLegacySQL != null) {
- builder = builder.setUseLegacySql(useLegacySQL);
- }
- Boolean withTemplateCompatibility =
configRow.getBoolean("with_template_compatibility");
- if (withTemplateCompatibility != null) {
- builder =
builder.setWithTemplateCompatibility(withTemplateCompatibility);
- }
- byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
- if (bigqueryServicesBytes != null) {
- builder =
- builder.setBigQueryServices((BigQueryServices)
fromByteArray(bigqueryServicesBytes));
- }
- byte[] parseFnBytes = configRow.getBytes("parse_fn");
- if (parseFnBytes != null) {
- builder = builder.setParseFn((SerializableFunction)
fromByteArray(parseFnBytes));
- }
- byte[] datumReaderFactoryBytes =
configRow.getBytes("datum_reader_factory");
- if (datumReaderFactoryBytes != null) {
- builder =
- builder.setDatumReaderFactory(
- (SerializableFunction) fromByteArray(datumReaderFactoryBytes));
- }
- byte[] queryPriorityBytes = configRow.getBytes("query_priority");
- if (queryPriorityBytes != null) {
- builder = builder.setQueryPriority((QueryPriority)
fromByteArray(queryPriorityBytes));
- }
- String queryLocation = configRow.getString("query_location");
- if (queryLocation != null) {
- builder = builder.setQueryLocation(queryLocation);
- }
- String queryTempDataset = configRow.getString("query_temp_dataset");
- if (queryTempDataset != null) {
- builder = builder.setQueryTempDataset(queryTempDataset);
- }
- byte[] methodBytes = configRow.getBytes("method");
- if (methodBytes != null) {
- builder = builder.setMethod((TypedRead.Method)
fromByteArray(methodBytes));
- }
- byte[] formatBytes = configRow.getBytes("format");
- if (methodBytes != null) {
- builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
- }
- Collection<String> selectedFields =
configRow.getArray("selected_fields");
- if (selectedFields != null && !selectedFields.isEmpty()) {
-
builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));
- }
- String rowRestriction = configRow.getString("row_restriction");
- if (rowRestriction != null) {
- builder =
builder.setRowRestriction(StaticValueProvider.of(rowRestriction));
- }
- byte[] coderBytes = configRow.getBytes("coder");
- if (coderBytes != null) {
- builder = builder.setCoder((Coder) fromByteArray(coderBytes));
- }
- String kmsKey = configRow.getString("kms_key");
- if (kmsKey != null) {
- builder = builder.setKmsKey(kmsKey);
- }
- byte[] typeDescriptorBytes = configRow.getBytes("type_descriptor");
- if (typeDescriptorBytes != null) {
- builder = builder.setTypeDescriptor((TypeDescriptor)
fromByteArray(typeDescriptorBytes));
- }
- byte[] toBeamRowFnBytes = configRow.getBytes("to_beam_row_fn");
- if (toBeamRowFnBytes != null) {
- builder = builder.setToBeamRowFn((ToBeamRowFunction)
fromByteArray(toBeamRowFnBytes));
- }
- byte[] fromBeamRowFnBytes = configRow.getBytes("from_beam_row_fn");
- if (fromBeamRowFnBytes != null) {
- builder = builder.setFromBeamRowFn((FromBeamRowFunction)
fromByteArray(fromBeamRowFnBytes));
- }
- Boolean useAvroLogicalTypes =
configRow.getBoolean("use_avro_logical_types");
- if (useAvroLogicalTypes != null) {
- builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
- }
- Boolean projectionPushdownApplied =
configRow.getBoolean("projection_pushdown_applied");
- if (projectionPushdownApplied != null) {
- builder =
builder.setProjectionPushdownApplied(projectionPushdownApplied);
- }
+ String jsonTableRef = configRow.getString("json_table_ref");
+ if (jsonTableRef != null) {
+ builder =
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
+ }
+ String query = configRow.getString("query");
+ if (query != null) {
+ builder = builder.setQuery(StaticValueProvider.of(query));
+ }
+ Boolean validate = configRow.getBoolean("validate");
+ if (validate != null) {
+ builder = builder.setValidate(validate);
+ }
+ Boolean flattenResults = configRow.getBoolean("flatten_results");
+ if (flattenResults != null) {
+ builder = builder.setFlattenResults(flattenResults);
+ }
+ Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
+ if (useLegacySQL != null) {
+ builder = builder.setUseLegacySql(useLegacySQL);
+ }
+ Boolean withTemplateCompatibility =
configRow.getBoolean("with_template_compatibility");
+ if (withTemplateCompatibility != null) {
+ builder =
builder.setWithTemplateCompatibility(withTemplateCompatibility);
+ }
+ byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
+ if (bigqueryServicesBytes != null) {
+ try {
+ builder =
+ builder.setBigQueryServices(
+ (BigQueryServices) fromByteArray(bigqueryServicesBytes));
+ } catch (InvalidClassException e) {
+ LOG.warn(
+ "Could not use the provided `BigQueryServices` implementation
when upgrading."
+ + "Using the default.");
+ builder.setBigQueryServices(new BigQueryServicesImpl());
+ }
+ }
+ byte[] parseFnBytes = configRow.getBytes("parse_fn");
+ if (parseFnBytes != null) {
+ builder = builder.setParseFn((SerializableFunction)
fromByteArray(parseFnBytes));
+ }
+ byte[] datumReaderFactoryBytes =
configRow.getBytes("datum_reader_factory");
+ if (datumReaderFactoryBytes != null) {
+ builder =
+ builder.setDatumReaderFactory(
+ (SerializableFunction)
fromByteArray(datumReaderFactoryBytes));
+ }
+ byte[] queryPriorityBytes = configRow.getBytes("query_priority");
+ if (queryPriorityBytes != null) {
+ builder = builder.setQueryPriority((QueryPriority)
fromByteArray(queryPriorityBytes));
+ }
+ String queryLocation = configRow.getString("query_location");
+ if (queryLocation != null) {
+ builder = builder.setQueryLocation(queryLocation);
+ }
+ String queryTempDataset = configRow.getString("query_temp_dataset");
+ if (queryTempDataset != null) {
+ builder = builder.setQueryTempDataset(queryTempDataset);
+ }
+ byte[] methodBytes = configRow.getBytes("method");
+ if (methodBytes != null) {
+ builder = builder.setMethod((TypedRead.Method)
fromByteArray(methodBytes));
+ }
+ byte[] formatBytes = configRow.getBytes("format");
+ if (methodBytes != null) {
+ builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
+ }
+ Collection<String> selectedFields =
configRow.getArray("selected_fields");
+ if (selectedFields != null && !selectedFields.isEmpty()) {
+
builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));
+ }
+ String rowRestriction = configRow.getString("row_restriction");
+ if (rowRestriction != null) {
+ builder =
builder.setRowRestriction(StaticValueProvider.of(rowRestriction));
+ }
+ byte[] coderBytes = configRow.getBytes("coder");
+ if (coderBytes != null) {
+ try {
+ builder = builder.setCoder((Coder) fromByteArray(coderBytes));
+ } catch (InvalidClassException e) {
+ LOG.warn(
Review Comment:
The default coder is set by the source transform if I don't set anything
here.
https://github.com/apache/beam/blob/951b3b1a81b340f358ce0d1bfe7017416b1763bd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1247
##########
sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java:
##########
@@ -203,162 +210,195 @@ public Row toConfigRow(Read<?, ?> transform) {
if (transform.getCheckStopReadingFn() != null) {
fieldValues.put("check_stop_reading_fn",
toByteArray(transform.getCheckStopReadingFn()));
}
+ if (transform.getBadRecordErrorHandler() != null) {
+ fieldValues.put(
+ "bad_record_error_handler",
toByteArray(transform.getBadRecordErrorHandler()));
+ }
return Row.withSchema(schema).withFieldValues(fieldValues).build();
}
@Override
public Read<?, ?> fromConfigRow(Row configRow) {
- Read<?, ?> transform = KafkaIO.read();
-
- Map<String, byte[]> consumerConfig = configRow.getMap("consumer_config");
- if (consumerConfig != null) {
- Map<String, Object> updatedConsumerConfig = new HashMap<>();
- consumerConfig.forEach(
- (key, dataBytes) -> {
- // Adding all allowed properties.
- if
(!KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.containsKey(key)) {
- if (consumerConfig.get(key) == null) {
- throw new IllegalArgumentException(
- "Encoded value of the consumer config property " + key +
" was null");
+ try {
+ Read<?, ?> transform = KafkaIO.read();
+
+ Map<String, byte[]> consumerConfig =
configRow.getMap("consumer_config");
+ if (consumerConfig != null) {
+ Map<String, Object> updatedConsumerConfig = new HashMap<>();
+ consumerConfig.forEach(
+ (key, dataBytes) -> {
+ // Adding all allowed properties.
+ if
(!KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.containsKey(key)) {
+ if (consumerConfig.get(key) == null) {
+ throw new IllegalArgumentException(
+ "Encoded value of the consumer config property " + key
+ " was null");
+ }
+ try {
+ updatedConsumerConfig.put(key,
fromByteArray(consumerConfig.get(key)));
+ } catch (InvalidClassException e) {
+ throw new RuntimeException(e);
+ }
}
- updatedConsumerConfig.put(key,
fromByteArray(consumerConfig.get(key)));
- }
- });
- transform = transform.withConsumerConfigUpdates(updatedConsumerConfig);
- }
- Collection<String> topics = configRow.getArray("topics");
- if (topics != null) {
- transform = transform.withTopics(new ArrayList<>(topics));
- }
- Collection<Row> topicPartitionRows =
configRow.getArray("topic_partitions");
- if (topicPartitionRows != null) {
- Collection<TopicPartition> topicPartitions =
- topicPartitionRows.stream()
- .map(
- row -> {
- String topic = row.getString("topic");
- if (topic == null) {
- throw new IllegalArgumentException("Expected the topic
to be not null");
- }
- Integer partition = row.getInt32("partition");
- if (partition == null) {
- throw new IllegalArgumentException("Expected the
partition to be not null");
- }
- return new TopicPartition(topic, partition);
- })
- .collect(Collectors.toList());
- transform =
transform.withTopicPartitions(Lists.newArrayList(topicPartitions));
- }
- String topicPattern = configRow.getString("topic_pattern");
- if (topicPattern != null) {
- transform = transform.withTopicPattern(topicPattern);
- }
+ });
+ transform =
transform.withConsumerConfigUpdates(updatedConsumerConfig);
+ }
+ Collection<String> topics = configRow.getArray("topics");
+ if (topics != null) {
+ transform = transform.withTopics(new ArrayList<>(topics));
+ }
+ Collection<Row> topicPartitionRows =
configRow.getArray("topic_partitions");
+ if (topicPartitionRows != null) {
+ Collection<TopicPartition> topicPartitions =
+ topicPartitionRows.stream()
+ .map(
+ row -> {
+ String topic = row.getString("topic");
+ if (topic == null) {
+ throw new IllegalArgumentException("Expected the
topic to be not null");
+ }
+ Integer partition = row.getInt32("partition");
+ if (partition == null) {
+ throw new IllegalArgumentException(
+ "Expected the partition to be not null");
+ }
+ return new TopicPartition(topic, partition);
+ })
+ .collect(Collectors.toList());
+ transform =
transform.withTopicPartitions(Lists.newArrayList(topicPartitions));
+ }
+ String topicPattern = configRow.getString("topic_pattern");
+ if (topicPattern != null) {
+ transform = transform.withTopicPattern(topicPattern);
+ }
- byte[] keyDeserializerProvider =
configRow.getBytes("key_deserializer_provider");
- if (keyDeserializerProvider != null) {
+ byte[] keyDeserializerProvider =
configRow.getBytes("key_deserializer_provider");
+ if (keyDeserializerProvider != null) {
+
+ byte[] keyCoder = configRow.getBytes("key_coder");
+ if (keyCoder != null) {
+ transform =
+ transform.withKeyDeserializerProviderAndCoder(
+ (DeserializerProvider)
fromByteArray(keyDeserializerProvider),
+ (org.apache.beam.sdk.coders.Coder)
fromByteArray(keyCoder));
+ } else {
+ transform =
+ transform.withKeyDeserializer(
+ (DeserializerProvider)
fromByteArray(keyDeserializerProvider));
+ }
+ }
+
+ byte[] valueDeserializerProvider =
configRow.getBytes("value_deserializer_provider");
+ if (valueDeserializerProvider != null) {
+ byte[] valueCoder = configRow.getBytes("value_coder");
+ if (valueCoder != null) {
+ transform =
+ transform.withValueDeserializerProviderAndCoder(
+ (DeserializerProvider)
fromByteArray(valueDeserializerProvider),
+ (org.apache.beam.sdk.coders.Coder)
fromByteArray(valueCoder));
+ } else {
+ transform =
+ transform.withValueDeserializer(
+ (DeserializerProvider)
fromByteArray(valueDeserializerProvider));
+ }
+ }
- byte[] keyCoder = configRow.getBytes("key_coder");
- if (keyCoder != null) {
+ byte[] consumerFactoryFn = configRow.getBytes("consumer_factory_fn");
+ if (consumerFactoryFn != null) {
+ transform =
+ transform.withConsumerFactoryFn(
+ (SerializableFunction<Map<String, Object>, Consumer<byte[],
byte[]>>)
+ fromByteArray(consumerFactoryFn));
+ }
+ byte[] watermarkFn = configRow.getBytes("watermark_fn");
+ if (watermarkFn != null) {
+ transform = transform.withWatermarkFn2((SerializableFunction)
fromByteArray(watermarkFn));
+ }
+ Long maxNumRecords = configRow.getInt64("max_num_records");
+ if (maxNumRecords != null) {
+ transform = transform.withMaxNumRecords(maxNumRecords);
+ }
+ Duration maxReadTime = configRow.getValue("max_read_time");
+ if (maxReadTime != null) {
transform =
- transform.withKeyDeserializerProviderAndCoder(
- (DeserializerProvider)
fromByteArray(keyDeserializerProvider),
- (org.apache.beam.sdk.coders.Coder) fromByteArray(keyCoder));
- } else {
+
transform.withMaxReadTime(org.joda.time.Duration.millis(maxReadTime.toMillis()));
+ }
+ Instant startReadTime = configRow.getValue("start_read_time");
+ if (startReadTime != null) {
+ transform = transform.withStartReadTime(startReadTime);
+ }
+ Instant stopReadTime = configRow.getValue("stop_read_time");
+ if (stopReadTime != null) {
+ transform = transform.withStopReadTime(stopReadTime);
+ }
+ Boolean isCommitOffsetFinalizeEnabled =
+ configRow.getBoolean("is_commit_offset_finalize_enabled");
+ if (isCommitOffsetFinalizeEnabled != null &&
isCommitOffsetFinalizeEnabled) {
+ transform = transform.commitOffsetsInFinalize();
+ }
+ Boolean isDynamicRead = configRow.getBoolean("is_dynamic_read");
+ if (isDynamicRead != null && isDynamicRead) {
+ Duration watchTopicPartitionDuration =
+ configRow.getValue("watch_topic_partition_duration");
+ if (watchTopicPartitionDuration == null) {
+ throw new IllegalArgumentException(
+ "Expected watchTopicPartitionDuration to be available when
isDynamicRead is set to true");
+ }
transform =
- transform.withKeyDeserializer(
- (DeserializerProvider)
fromByteArray(keyDeserializerProvider));
+ transform.withDynamicRead(
+
org.joda.time.Duration.millis(watchTopicPartitionDuration.toMillis()));
}
- }
- byte[] valueDeserializerProvider =
configRow.getBytes("value_deserializer_provider");
- if (valueDeserializerProvider != null) {
- byte[] valueCoder = configRow.getBytes("value_coder");
- if (valueCoder != null) {
- transform =
- transform.withValueDeserializerProviderAndCoder(
- (DeserializerProvider)
fromByteArray(valueDeserializerProvider),
- (org.apache.beam.sdk.coders.Coder)
fromByteArray(valueCoder));
- } else {
+ byte[] timestampPolicyFactory =
configRow.getBytes("timestamp_policy_factory");
+ if (timestampPolicyFactory != null) {
transform =
- transform.withValueDeserializer(
- (DeserializerProvider)
fromByteArray(valueDeserializerProvider));
+ transform.withTimestampPolicyFactory(
+ (TimestampPolicyFactory)
fromByteArray(timestampPolicyFactory));
+ }
+ Map<String, byte[]> offsetConsumerConfig =
configRow.getMap("offset_consumer_config");
+ if (offsetConsumerConfig != null) {
+ Map<String, Object> updatedOffsetConsumerConfig = new HashMap<>();
+ offsetConsumerConfig.forEach(
+ (key, dataBytes) -> {
+ if (offsetConsumerConfig.get(key) == null) {
+ throw new IllegalArgumentException(
+ "Encoded value for the offset consumer config key " +
key + " was null.");
+ }
+ try {
+ updatedOffsetConsumerConfig.put(
+ key, fromByteArray(offsetConsumerConfig.get(key)));
+ } catch (InvalidClassException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ transform =
transform.withOffsetConsumerConfigOverrides(updatedOffsetConsumerConfig);
}
- }
- byte[] consumerFactoryFn = configRow.getBytes("consumer_factory_fn");
- if (consumerFactoryFn != null) {
- transform =
- transform.withConsumerFactoryFn(
- (SerializableFunction<Map<String, Object>, Consumer<byte[],
byte[]>>)
- fromByteArray(consumerFactoryFn));
- }
- byte[] watermarkFn = configRow.getBytes("watermark_fn");
- if (watermarkFn != null) {
- transform = transform.withWatermarkFn2((SerializableFunction)
fromByteArray(watermarkFn));
- }
- Long maxNumRecords = configRow.getInt64("max_num_records");
- if (maxNumRecords != null) {
- transform = transform.withMaxNumRecords(maxNumRecords);
- }
- Duration maxReadTime = configRow.getValue("max_read_time");
- if (maxReadTime != null) {
- transform =
-
transform.withMaxReadTime(org.joda.time.Duration.millis(maxReadTime.toMillis()));
- }
- Instant startReadTime = configRow.getValue("start_read_time");
- if (startReadTime != null) {
- transform = transform.withStartReadTime(startReadTime);
- }
- Instant stopReadTime = configRow.getValue("stop_read_time");
- if (stopReadTime != null) {
- transform = transform.withStopReadTime(stopReadTime);
- }
- Boolean isCommitOffsetFinalizeEnabled =
- configRow.getBoolean("is_commit_offset_finalize_enabled");
- if (isCommitOffsetFinalizeEnabled != null &&
isCommitOffsetFinalizeEnabled) {
- transform = transform.commitOffsetsInFinalize();
- }
- Boolean isDynamicRead = configRow.getBoolean("is_dynamic_read");
- if (isDynamicRead != null && isDynamicRead) {
- Duration watchTopicPartitionDuration =
configRow.getValue("watch_topic_partition_duration");
- if (watchTopicPartitionDuration == null) {
- throw new IllegalArgumentException(
- "Expected watchTopicPartitionDuration to be available when
isDynamicRead is set to true");
+ byte[] checkStopReadinfFn =
configRow.getBytes("check_stop_reading_fn");
+ if (checkStopReadinfFn != null) {
+ transform =
+ transform.withCheckStopReadingFn(
+ (SerializableFunction<TopicPartition, Boolean>)
+ fromByteArray(checkStopReadinfFn));
}
- transform =
- transform.withDynamicRead(
-
org.joda.time.Duration.millis(watchTopicPartitionDuration.toMillis()));
- }
- byte[] timestampPolicyFactory =
configRow.getBytes("timestamp_policy_factory");
- if (timestampPolicyFactory != null) {
- transform =
- transform.withTimestampPolicyFactory(
- (TimestampPolicyFactory)
fromByteArray(timestampPolicyFactory));
- }
- Map<String, byte[]> offsetConsumerConfig =
configRow.getMap("offset_consumer_config");
- if (offsetConsumerConfig != null) {
- Map<String, Object> updatedOffsetConsumerConfig = new HashMap<>();
- offsetConsumerConfig.forEach(
- (key, dataBytes) -> {
- if (offsetConsumerConfig.get(key) == null) {
- throw new IllegalArgumentException(
- "Encoded value for the offset consumer config key " + key
+ " was null.");
- }
- updatedOffsetConsumerConfig.put(key,
fromByteArray(offsetConsumerConfig.get(key)));
- });
- transform =
transform.withOffsetConsumerConfigOverrides(updatedOffsetConsumerConfig);
- }
+ byte[] badRecordErrorHandlerBytes =
configRow.getBytes("bad_record_error_handler");
+ if (badRecordErrorHandlerBytes != null) {
+ try {
+ transform =
+ transform.withBadRecordErrorHandler(
+ (ErrorHandler) fromByteArray(badRecordErrorHandlerBytes));
+ } catch (InvalidClassException e) {
+ LOG.warn(
+ "Could not use the provided `ErrorHandler` implementation when
upgrading."
+ + "Using the default.");
+ }
Review Comment:
I think ErrorHandler has to be instantiated with it's own schema but will do
this in a separate PR.
For now, updating the code so that we do a hard fail if the user set this
property.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java:
##########
@@ -184,105 +189,124 @@ public Row toConfigRow(TypedRead<?> transform) {
@Override
public TypedRead<?> fromConfigRow(Row configRow) {
- BigQueryIO.TypedRead.Builder builder = new
AutoValue_BigQueryIO_TypedRead.Builder<>();
+ try {
+ BigQueryIO.TypedRead.Builder builder = new
AutoValue_BigQueryIO_TypedRead.Builder<>();
- String jsonTableRef = configRow.getString("json_table_ref");
- if (jsonTableRef != null) {
- builder =
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
- }
- String query = configRow.getString("query");
- if (query != null) {
- builder = builder.setQuery(StaticValueProvider.of(query));
- }
- Boolean validate = configRow.getBoolean("validate");
- if (validate != null) {
- builder = builder.setValidate(validate);
- }
- Boolean flattenResults = configRow.getBoolean("flatten_results");
- if (flattenResults != null) {
- builder = builder.setFlattenResults(flattenResults);
- }
- Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
- if (useLegacySQL != null) {
- builder = builder.setUseLegacySql(useLegacySQL);
- }
- Boolean withTemplateCompatibility =
configRow.getBoolean("with_template_compatibility");
- if (withTemplateCompatibility != null) {
- builder =
builder.setWithTemplateCompatibility(withTemplateCompatibility);
- }
- byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
- if (bigqueryServicesBytes != null) {
- builder =
- builder.setBigQueryServices((BigQueryServices)
fromByteArray(bigqueryServicesBytes));
- }
- byte[] parseFnBytes = configRow.getBytes("parse_fn");
- if (parseFnBytes != null) {
- builder = builder.setParseFn((SerializableFunction)
fromByteArray(parseFnBytes));
- }
- byte[] datumReaderFactoryBytes =
configRow.getBytes("datum_reader_factory");
- if (datumReaderFactoryBytes != null) {
- builder =
- builder.setDatumReaderFactory(
- (SerializableFunction) fromByteArray(datumReaderFactoryBytes));
- }
- byte[] queryPriorityBytes = configRow.getBytes("query_priority");
- if (queryPriorityBytes != null) {
- builder = builder.setQueryPriority((QueryPriority)
fromByteArray(queryPriorityBytes));
- }
- String queryLocation = configRow.getString("query_location");
- if (queryLocation != null) {
- builder = builder.setQueryLocation(queryLocation);
- }
- String queryTempDataset = configRow.getString("query_temp_dataset");
- if (queryTempDataset != null) {
- builder = builder.setQueryTempDataset(queryTempDataset);
- }
- byte[] methodBytes = configRow.getBytes("method");
- if (methodBytes != null) {
- builder = builder.setMethod((TypedRead.Method)
fromByteArray(methodBytes));
- }
- byte[] formatBytes = configRow.getBytes("format");
- if (methodBytes != null) {
- builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
- }
- Collection<String> selectedFields =
configRow.getArray("selected_fields");
- if (selectedFields != null && !selectedFields.isEmpty()) {
-
builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));
- }
- String rowRestriction = configRow.getString("row_restriction");
- if (rowRestriction != null) {
- builder =
builder.setRowRestriction(StaticValueProvider.of(rowRestriction));
- }
- byte[] coderBytes = configRow.getBytes("coder");
- if (coderBytes != null) {
- builder = builder.setCoder((Coder) fromByteArray(coderBytes));
- }
- String kmsKey = configRow.getString("kms_key");
- if (kmsKey != null) {
- builder = builder.setKmsKey(kmsKey);
- }
- byte[] typeDescriptorBytes = configRow.getBytes("type_descriptor");
- if (typeDescriptorBytes != null) {
- builder = builder.setTypeDescriptor((TypeDescriptor)
fromByteArray(typeDescriptorBytes));
- }
- byte[] toBeamRowFnBytes = configRow.getBytes("to_beam_row_fn");
- if (toBeamRowFnBytes != null) {
- builder = builder.setToBeamRowFn((ToBeamRowFunction)
fromByteArray(toBeamRowFnBytes));
- }
- byte[] fromBeamRowFnBytes = configRow.getBytes("from_beam_row_fn");
- if (fromBeamRowFnBytes != null) {
- builder = builder.setFromBeamRowFn((FromBeamRowFunction)
fromByteArray(fromBeamRowFnBytes));
- }
- Boolean useAvroLogicalTypes =
configRow.getBoolean("use_avro_logical_types");
- if (useAvroLogicalTypes != null) {
- builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
- }
- Boolean projectionPushdownApplied =
configRow.getBoolean("projection_pushdown_applied");
- if (projectionPushdownApplied != null) {
- builder =
builder.setProjectionPushdownApplied(projectionPushdownApplied);
- }
+ String jsonTableRef = configRow.getString("json_table_ref");
+ if (jsonTableRef != null) {
+ builder =
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
+ }
+ String query = configRow.getString("query");
+ if (query != null) {
+ builder = builder.setQuery(StaticValueProvider.of(query));
+ }
+ Boolean validate = configRow.getBoolean("validate");
+ if (validate != null) {
+ builder = builder.setValidate(validate);
+ }
+ Boolean flattenResults = configRow.getBoolean("flatten_results");
+ if (flattenResults != null) {
+ builder = builder.setFlattenResults(flattenResults);
+ }
+ Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
+ if (useLegacySQL != null) {
+ builder = builder.setUseLegacySql(useLegacySQL);
+ }
+ Boolean withTemplateCompatibility =
configRow.getBoolean("with_template_compatibility");
+ if (withTemplateCompatibility != null) {
+ builder =
builder.setWithTemplateCompatibility(withTemplateCompatibility);
+ }
+ byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
+ if (bigqueryServicesBytes != null) {
+ try {
+ builder =
+ builder.setBigQueryServices(
+ (BigQueryServices) fromByteArray(bigqueryServicesBytes));
+ } catch (InvalidClassException e) {
Review Comment:
I mentioned the field in the log (and should also be in the exception
stacktrace). I only handle specific cases here. If deserialization fails for
any other cases, it would be a hard fail during job submission.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java:
##########
@@ -184,105 +189,124 @@ public Row toConfigRow(TypedRead<?> transform) {
@Override
public TypedRead<?> fromConfigRow(Row configRow) {
- BigQueryIO.TypedRead.Builder builder = new
AutoValue_BigQueryIO_TypedRead.Builder<>();
+ try {
+ BigQueryIO.TypedRead.Builder builder = new
AutoValue_BigQueryIO_TypedRead.Builder<>();
- String jsonTableRef = configRow.getString("json_table_ref");
- if (jsonTableRef != null) {
- builder =
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
- }
- String query = configRow.getString("query");
- if (query != null) {
- builder = builder.setQuery(StaticValueProvider.of(query));
- }
- Boolean validate = configRow.getBoolean("validate");
- if (validate != null) {
- builder = builder.setValidate(validate);
- }
- Boolean flattenResults = configRow.getBoolean("flatten_results");
- if (flattenResults != null) {
- builder = builder.setFlattenResults(flattenResults);
- }
- Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
- if (useLegacySQL != null) {
- builder = builder.setUseLegacySql(useLegacySQL);
- }
- Boolean withTemplateCompatibility =
configRow.getBoolean("with_template_compatibility");
- if (withTemplateCompatibility != null) {
- builder =
builder.setWithTemplateCompatibility(withTemplateCompatibility);
- }
- byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
- if (bigqueryServicesBytes != null) {
- builder =
- builder.setBigQueryServices((BigQueryServices)
fromByteArray(bigqueryServicesBytes));
- }
- byte[] parseFnBytes = configRow.getBytes("parse_fn");
- if (parseFnBytes != null) {
- builder = builder.setParseFn((SerializableFunction)
fromByteArray(parseFnBytes));
- }
- byte[] datumReaderFactoryBytes =
configRow.getBytes("datum_reader_factory");
- if (datumReaderFactoryBytes != null) {
- builder =
- builder.setDatumReaderFactory(
- (SerializableFunction) fromByteArray(datumReaderFactoryBytes));
- }
- byte[] queryPriorityBytes = configRow.getBytes("query_priority");
- if (queryPriorityBytes != null) {
- builder = builder.setQueryPriority((QueryPriority)
fromByteArray(queryPriorityBytes));
- }
- String queryLocation = configRow.getString("query_location");
- if (queryLocation != null) {
- builder = builder.setQueryLocation(queryLocation);
- }
- String queryTempDataset = configRow.getString("query_temp_dataset");
- if (queryTempDataset != null) {
- builder = builder.setQueryTempDataset(queryTempDataset);
- }
- byte[] methodBytes = configRow.getBytes("method");
- if (methodBytes != null) {
- builder = builder.setMethod((TypedRead.Method)
fromByteArray(methodBytes));
- }
- byte[] formatBytes = configRow.getBytes("format");
- if (methodBytes != null) {
- builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
- }
- Collection<String> selectedFields =
configRow.getArray("selected_fields");
- if (selectedFields != null && !selectedFields.isEmpty()) {
-
builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));
- }
- String rowRestriction = configRow.getString("row_restriction");
- if (rowRestriction != null) {
- builder =
builder.setRowRestriction(StaticValueProvider.of(rowRestriction));
- }
- byte[] coderBytes = configRow.getBytes("coder");
- if (coderBytes != null) {
- builder = builder.setCoder((Coder) fromByteArray(coderBytes));
- }
- String kmsKey = configRow.getString("kms_key");
- if (kmsKey != null) {
- builder = builder.setKmsKey(kmsKey);
- }
- byte[] typeDescriptorBytes = configRow.getBytes("type_descriptor");
- if (typeDescriptorBytes != null) {
- builder = builder.setTypeDescriptor((TypeDescriptor)
fromByteArray(typeDescriptorBytes));
- }
- byte[] toBeamRowFnBytes = configRow.getBytes("to_beam_row_fn");
- if (toBeamRowFnBytes != null) {
- builder = builder.setToBeamRowFn((ToBeamRowFunction)
fromByteArray(toBeamRowFnBytes));
- }
- byte[] fromBeamRowFnBytes = configRow.getBytes("from_beam_row_fn");
- if (fromBeamRowFnBytes != null) {
- builder = builder.setFromBeamRowFn((FromBeamRowFunction)
fromByteArray(fromBeamRowFnBytes));
- }
- Boolean useAvroLogicalTypes =
configRow.getBoolean("use_avro_logical_types");
- if (useAvroLogicalTypes != null) {
- builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
- }
- Boolean projectionPushdownApplied =
configRow.getBoolean("projection_pushdown_applied");
- if (projectionPushdownApplied != null) {
- builder =
builder.setProjectionPushdownApplied(projectionPushdownApplied);
- }
+ String jsonTableRef = configRow.getString("json_table_ref");
+ if (jsonTableRef != null) {
+ builder =
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
+ }
+ String query = configRow.getString("query");
+ if (query != null) {
+ builder = builder.setQuery(StaticValueProvider.of(query));
+ }
+ Boolean validate = configRow.getBoolean("validate");
+ if (validate != null) {
+ builder = builder.setValidate(validate);
+ }
+ Boolean flattenResults = configRow.getBoolean("flatten_results");
+ if (flattenResults != null) {
+ builder = builder.setFlattenResults(flattenResults);
+ }
+ Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
+ if (useLegacySQL != null) {
+ builder = builder.setUseLegacySql(useLegacySQL);
+ }
+ Boolean withTemplateCompatibility =
configRow.getBoolean("with_template_compatibility");
+ if (withTemplateCompatibility != null) {
+ builder =
builder.setWithTemplateCompatibility(withTemplateCompatibility);
+ }
+ byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
+ if (bigqueryServicesBytes != null) {
+ try {
+ builder =
+ builder.setBigQueryServices(
+ (BigQueryServices) fromByteArray(bigqueryServicesBytes));
+ } catch (InvalidClassException e) {
+ LOG.warn(
+ "Could not use the provided `BigQueryServices` implementation
when upgrading."
+ + "Using the default.");
+ builder.setBigQueryServices(new BigQueryServicesImpl());
Review Comment:
Unfortunately this object is set by default for BQ source/sink transforms
and BQ transforms cannot be constructed without it. See below.
https://github.com/apache/beam/blob/951b3b1a81b340f358ce0d1bfe7017416b1763bd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L724
https://github.com/apache/beam/blob/951b3b1a81b340f358ce0d1bfe7017416b1763bd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2127
So I have to set it to re-build source/sink transform object here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]