johnjcasey commented on code in PR #29731:
URL: https://github.com/apache/beam/pull/29731#discussion_r1425582331
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java:
##########
@@ -184,105 +189,124 @@ public Row toConfigRow(TypedRead<?> transform) {
@Override
public TypedRead<?> fromConfigRow(Row configRow) {
- BigQueryIO.TypedRead.Builder builder = new
AutoValue_BigQueryIO_TypedRead.Builder<>();
+ try {
+ BigQueryIO.TypedRead.Builder builder = new
AutoValue_BigQueryIO_TypedRead.Builder<>();
- String jsonTableRef = configRow.getString("json_table_ref");
- if (jsonTableRef != null) {
- builder =
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
- }
- String query = configRow.getString("query");
- if (query != null) {
- builder = builder.setQuery(StaticValueProvider.of(query));
- }
- Boolean validate = configRow.getBoolean("validate");
- if (validate != null) {
- builder = builder.setValidate(validate);
- }
- Boolean flattenResults = configRow.getBoolean("flatten_results");
- if (flattenResults != null) {
- builder = builder.setFlattenResults(flattenResults);
- }
- Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
- if (useLegacySQL != null) {
- builder = builder.setUseLegacySql(useLegacySQL);
- }
- Boolean withTemplateCompatibility =
configRow.getBoolean("with_template_compatibility");
- if (withTemplateCompatibility != null) {
- builder =
builder.setWithTemplateCompatibility(withTemplateCompatibility);
- }
- byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
- if (bigqueryServicesBytes != null) {
- builder =
- builder.setBigQueryServices((BigQueryServices)
fromByteArray(bigqueryServicesBytes));
- }
- byte[] parseFnBytes = configRow.getBytes("parse_fn");
- if (parseFnBytes != null) {
- builder = builder.setParseFn((SerializableFunction)
fromByteArray(parseFnBytes));
- }
- byte[] datumReaderFactoryBytes =
configRow.getBytes("datum_reader_factory");
- if (datumReaderFactoryBytes != null) {
- builder =
- builder.setDatumReaderFactory(
- (SerializableFunction) fromByteArray(datumReaderFactoryBytes));
- }
- byte[] queryPriorityBytes = configRow.getBytes("query_priority");
- if (queryPriorityBytes != null) {
- builder = builder.setQueryPriority((QueryPriority)
fromByteArray(queryPriorityBytes));
- }
- String queryLocation = configRow.getString("query_location");
- if (queryLocation != null) {
- builder = builder.setQueryLocation(queryLocation);
- }
- String queryTempDataset = configRow.getString("query_temp_dataset");
- if (queryTempDataset != null) {
- builder = builder.setQueryTempDataset(queryTempDataset);
- }
- byte[] methodBytes = configRow.getBytes("method");
- if (methodBytes != null) {
- builder = builder.setMethod((TypedRead.Method)
fromByteArray(methodBytes));
- }
- byte[] formatBytes = configRow.getBytes("format");
- if (methodBytes != null) {
- builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
- }
- Collection<String> selectedFields =
configRow.getArray("selected_fields");
- if (selectedFields != null && !selectedFields.isEmpty()) {
-
builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));
- }
- String rowRestriction = configRow.getString("row_restriction");
- if (rowRestriction != null) {
- builder =
builder.setRowRestriction(StaticValueProvider.of(rowRestriction));
- }
- byte[] coderBytes = configRow.getBytes("coder");
- if (coderBytes != null) {
- builder = builder.setCoder((Coder) fromByteArray(coderBytes));
- }
- String kmsKey = configRow.getString("kms_key");
- if (kmsKey != null) {
- builder = builder.setKmsKey(kmsKey);
- }
- byte[] typeDescriptorBytes = configRow.getBytes("type_descriptor");
- if (typeDescriptorBytes != null) {
- builder = builder.setTypeDescriptor((TypeDescriptor)
fromByteArray(typeDescriptorBytes));
- }
- byte[] toBeamRowFnBytes = configRow.getBytes("to_beam_row_fn");
- if (toBeamRowFnBytes != null) {
- builder = builder.setToBeamRowFn((ToBeamRowFunction)
fromByteArray(toBeamRowFnBytes));
- }
- byte[] fromBeamRowFnBytes = configRow.getBytes("from_beam_row_fn");
- if (fromBeamRowFnBytes != null) {
- builder = builder.setFromBeamRowFn((FromBeamRowFunction)
fromByteArray(fromBeamRowFnBytes));
- }
- Boolean useAvroLogicalTypes =
configRow.getBoolean("use_avro_logical_types");
- if (useAvroLogicalTypes != null) {
- builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
- }
- Boolean projectionPushdownApplied =
configRow.getBoolean("projection_pushdown_applied");
- if (projectionPushdownApplied != null) {
- builder =
builder.setProjectionPushdownApplied(projectionPushdownApplied);
- }
+ String jsonTableRef = configRow.getString("json_table_ref");
+ if (jsonTableRef != null) {
+ builder =
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
+ }
+ String query = configRow.getString("query");
+ if (query != null) {
+ builder = builder.setQuery(StaticValueProvider.of(query));
+ }
+ Boolean validate = configRow.getBoolean("validate");
+ if (validate != null) {
+ builder = builder.setValidate(validate);
+ }
+ Boolean flattenResults = configRow.getBoolean("flatten_results");
+ if (flattenResults != null) {
+ builder = builder.setFlattenResults(flattenResults);
+ }
+ Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
+ if (useLegacySQL != null) {
+ builder = builder.setUseLegacySql(useLegacySQL);
+ }
+ Boolean withTemplateCompatibility =
configRow.getBoolean("with_template_compatibility");
+ if (withTemplateCompatibility != null) {
+ builder =
builder.setWithTemplateCompatibility(withTemplateCompatibility);
+ }
+ byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
+ if (bigqueryServicesBytes != null) {
+ try {
+ builder =
+ builder.setBigQueryServices(
+ (BigQueryServices) fromByteArray(bigqueryServicesBytes));
+ } catch (InvalidClassException e) {
Review Comment:
How would one know which fields could fail to deserialize?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java:
##########
@@ -184,105 +189,124 @@ public Row toConfigRow(TypedRead<?> transform) {
@Override
public TypedRead<?> fromConfigRow(Row configRow) {
- BigQueryIO.TypedRead.Builder builder = new
AutoValue_BigQueryIO_TypedRead.Builder<>();
+ try {
+ BigQueryIO.TypedRead.Builder builder = new
AutoValue_BigQueryIO_TypedRead.Builder<>();
- String jsonTableRef = configRow.getString("json_table_ref");
- if (jsonTableRef != null) {
- builder =
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
- }
- String query = configRow.getString("query");
- if (query != null) {
- builder = builder.setQuery(StaticValueProvider.of(query));
- }
- Boolean validate = configRow.getBoolean("validate");
- if (validate != null) {
- builder = builder.setValidate(validate);
- }
- Boolean flattenResults = configRow.getBoolean("flatten_results");
- if (flattenResults != null) {
- builder = builder.setFlattenResults(flattenResults);
- }
- Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
- if (useLegacySQL != null) {
- builder = builder.setUseLegacySql(useLegacySQL);
- }
- Boolean withTemplateCompatibility =
configRow.getBoolean("with_template_compatibility");
- if (withTemplateCompatibility != null) {
- builder =
builder.setWithTemplateCompatibility(withTemplateCompatibility);
- }
- byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
- if (bigqueryServicesBytes != null) {
- builder =
- builder.setBigQueryServices((BigQueryServices)
fromByteArray(bigqueryServicesBytes));
- }
- byte[] parseFnBytes = configRow.getBytes("parse_fn");
- if (parseFnBytes != null) {
- builder = builder.setParseFn((SerializableFunction)
fromByteArray(parseFnBytes));
- }
- byte[] datumReaderFactoryBytes =
configRow.getBytes("datum_reader_factory");
- if (datumReaderFactoryBytes != null) {
- builder =
- builder.setDatumReaderFactory(
- (SerializableFunction) fromByteArray(datumReaderFactoryBytes));
- }
- byte[] queryPriorityBytes = configRow.getBytes("query_priority");
- if (queryPriorityBytes != null) {
- builder = builder.setQueryPriority((QueryPriority)
fromByteArray(queryPriorityBytes));
- }
- String queryLocation = configRow.getString("query_location");
- if (queryLocation != null) {
- builder = builder.setQueryLocation(queryLocation);
- }
- String queryTempDataset = configRow.getString("query_temp_dataset");
- if (queryTempDataset != null) {
- builder = builder.setQueryTempDataset(queryTempDataset);
- }
- byte[] methodBytes = configRow.getBytes("method");
- if (methodBytes != null) {
- builder = builder.setMethod((TypedRead.Method)
fromByteArray(methodBytes));
- }
- byte[] formatBytes = configRow.getBytes("format");
- if (methodBytes != null) {
- builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
- }
- Collection<String> selectedFields =
configRow.getArray("selected_fields");
- if (selectedFields != null && !selectedFields.isEmpty()) {
-
builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));
- }
- String rowRestriction = configRow.getString("row_restriction");
- if (rowRestriction != null) {
- builder =
builder.setRowRestriction(StaticValueProvider.of(rowRestriction));
- }
- byte[] coderBytes = configRow.getBytes("coder");
- if (coderBytes != null) {
- builder = builder.setCoder((Coder) fromByteArray(coderBytes));
- }
- String kmsKey = configRow.getString("kms_key");
- if (kmsKey != null) {
- builder = builder.setKmsKey(kmsKey);
- }
- byte[] typeDescriptorBytes = configRow.getBytes("type_descriptor");
- if (typeDescriptorBytes != null) {
- builder = builder.setTypeDescriptor((TypeDescriptor)
fromByteArray(typeDescriptorBytes));
- }
- byte[] toBeamRowFnBytes = configRow.getBytes("to_beam_row_fn");
- if (toBeamRowFnBytes != null) {
- builder = builder.setToBeamRowFn((ToBeamRowFunction)
fromByteArray(toBeamRowFnBytes));
- }
- byte[] fromBeamRowFnBytes = configRow.getBytes("from_beam_row_fn");
- if (fromBeamRowFnBytes != null) {
- builder = builder.setFromBeamRowFn((FromBeamRowFunction)
fromByteArray(fromBeamRowFnBytes));
- }
- Boolean useAvroLogicalTypes =
configRow.getBoolean("use_avro_logical_types");
- if (useAvroLogicalTypes != null) {
- builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
- }
- Boolean projectionPushdownApplied =
configRow.getBoolean("projection_pushdown_applied");
- if (projectionPushdownApplied != null) {
- builder =
builder.setProjectionPushdownApplied(projectionPushdownApplied);
- }
+ String jsonTableRef = configRow.getString("json_table_ref");
+ if (jsonTableRef != null) {
+ builder =
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
+ }
+ String query = configRow.getString("query");
+ if (query != null) {
+ builder = builder.setQuery(StaticValueProvider.of(query));
+ }
+ Boolean validate = configRow.getBoolean("validate");
+ if (validate != null) {
+ builder = builder.setValidate(validate);
+ }
+ Boolean flattenResults = configRow.getBoolean("flatten_results");
+ if (flattenResults != null) {
+ builder = builder.setFlattenResults(flattenResults);
+ }
+ Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
+ if (useLegacySQL != null) {
+ builder = builder.setUseLegacySql(useLegacySQL);
+ }
+ Boolean withTemplateCompatibility =
configRow.getBoolean("with_template_compatibility");
+ if (withTemplateCompatibility != null) {
+ builder =
builder.setWithTemplateCompatibility(withTemplateCompatibility);
+ }
+ byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
+ if (bigqueryServicesBytes != null) {
+ try {
+ builder =
+ builder.setBigQueryServices(
+ (BigQueryServices) fromByteArray(bigqueryServicesBytes));
+ } catch (InvalidClassException e) {
+ LOG.warn(
+ "Could not use the provided `BigQueryServices` implementation
when upgrading."
+ + "Using the default.");
+ builder.setBigQueryServices(new BigQueryServicesImpl());
Review Comment:
If this is sufficient for correct behavior, why do we deserialize the
service bytes at all?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java:
##########
@@ -184,105 +189,124 @@ public Row toConfigRow(TypedRead<?> transform) {
@Override
public TypedRead<?> fromConfigRow(Row configRow) {
- BigQueryIO.TypedRead.Builder builder = new
AutoValue_BigQueryIO_TypedRead.Builder<>();
+ try {
+ BigQueryIO.TypedRead.Builder builder = new
AutoValue_BigQueryIO_TypedRead.Builder<>();
- String jsonTableRef = configRow.getString("json_table_ref");
- if (jsonTableRef != null) {
- builder =
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
- }
- String query = configRow.getString("query");
- if (query != null) {
- builder = builder.setQuery(StaticValueProvider.of(query));
- }
- Boolean validate = configRow.getBoolean("validate");
- if (validate != null) {
- builder = builder.setValidate(validate);
- }
- Boolean flattenResults = configRow.getBoolean("flatten_results");
- if (flattenResults != null) {
- builder = builder.setFlattenResults(flattenResults);
- }
- Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
- if (useLegacySQL != null) {
- builder = builder.setUseLegacySql(useLegacySQL);
- }
- Boolean withTemplateCompatibility =
configRow.getBoolean("with_template_compatibility");
- if (withTemplateCompatibility != null) {
- builder =
builder.setWithTemplateCompatibility(withTemplateCompatibility);
- }
- byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
- if (bigqueryServicesBytes != null) {
- builder =
- builder.setBigQueryServices((BigQueryServices)
fromByteArray(bigqueryServicesBytes));
- }
- byte[] parseFnBytes = configRow.getBytes("parse_fn");
- if (parseFnBytes != null) {
- builder = builder.setParseFn((SerializableFunction)
fromByteArray(parseFnBytes));
- }
- byte[] datumReaderFactoryBytes =
configRow.getBytes("datum_reader_factory");
- if (datumReaderFactoryBytes != null) {
- builder =
- builder.setDatumReaderFactory(
- (SerializableFunction) fromByteArray(datumReaderFactoryBytes));
- }
- byte[] queryPriorityBytes = configRow.getBytes("query_priority");
- if (queryPriorityBytes != null) {
- builder = builder.setQueryPriority((QueryPriority)
fromByteArray(queryPriorityBytes));
- }
- String queryLocation = configRow.getString("query_location");
- if (queryLocation != null) {
- builder = builder.setQueryLocation(queryLocation);
- }
- String queryTempDataset = configRow.getString("query_temp_dataset");
- if (queryTempDataset != null) {
- builder = builder.setQueryTempDataset(queryTempDataset);
- }
- byte[] methodBytes = configRow.getBytes("method");
- if (methodBytes != null) {
- builder = builder.setMethod((TypedRead.Method)
fromByteArray(methodBytes));
- }
- byte[] formatBytes = configRow.getBytes("format");
- if (methodBytes != null) {
- builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
- }
- Collection<String> selectedFields =
configRow.getArray("selected_fields");
- if (selectedFields != null && !selectedFields.isEmpty()) {
-
builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));
- }
- String rowRestriction = configRow.getString("row_restriction");
- if (rowRestriction != null) {
- builder =
builder.setRowRestriction(StaticValueProvider.of(rowRestriction));
- }
- byte[] coderBytes = configRow.getBytes("coder");
- if (coderBytes != null) {
- builder = builder.setCoder((Coder) fromByteArray(coderBytes));
- }
- String kmsKey = configRow.getString("kms_key");
- if (kmsKey != null) {
- builder = builder.setKmsKey(kmsKey);
- }
- byte[] typeDescriptorBytes = configRow.getBytes("type_descriptor");
- if (typeDescriptorBytes != null) {
- builder = builder.setTypeDescriptor((TypeDescriptor)
fromByteArray(typeDescriptorBytes));
- }
- byte[] toBeamRowFnBytes = configRow.getBytes("to_beam_row_fn");
- if (toBeamRowFnBytes != null) {
- builder = builder.setToBeamRowFn((ToBeamRowFunction)
fromByteArray(toBeamRowFnBytes));
- }
- byte[] fromBeamRowFnBytes = configRow.getBytes("from_beam_row_fn");
- if (fromBeamRowFnBytes != null) {
- builder = builder.setFromBeamRowFn((FromBeamRowFunction)
fromByteArray(fromBeamRowFnBytes));
- }
- Boolean useAvroLogicalTypes =
configRow.getBoolean("use_avro_logical_types");
- if (useAvroLogicalTypes != null) {
- builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
- }
- Boolean projectionPushdownApplied =
configRow.getBoolean("projection_pushdown_applied");
- if (projectionPushdownApplied != null) {
- builder =
builder.setProjectionPushdownApplied(projectionPushdownApplied);
- }
+ String jsonTableRef = configRow.getString("json_table_ref");
+ if (jsonTableRef != null) {
+ builder =
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
+ }
+ String query = configRow.getString("query");
+ if (query != null) {
+ builder = builder.setQuery(StaticValueProvider.of(query));
+ }
+ Boolean validate = configRow.getBoolean("validate");
+ if (validate != null) {
+ builder = builder.setValidate(validate);
+ }
+ Boolean flattenResults = configRow.getBoolean("flatten_results");
+ if (flattenResults != null) {
+ builder = builder.setFlattenResults(flattenResults);
+ }
+ Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
+ if (useLegacySQL != null) {
+ builder = builder.setUseLegacySql(useLegacySQL);
+ }
+ Boolean withTemplateCompatibility =
configRow.getBoolean("with_template_compatibility");
+ if (withTemplateCompatibility != null) {
+ builder =
builder.setWithTemplateCompatibility(withTemplateCompatibility);
+ }
+ byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
+ if (bigqueryServicesBytes != null) {
+ try {
+ builder =
+ builder.setBigQueryServices(
+ (BigQueryServices) fromByteArray(bigqueryServicesBytes));
+ } catch (InvalidClassException e) {
+ LOG.warn(
+ "Could not use the provided `BigQueryServices` implementation
when upgrading."
+ + "Using the default.");
+ builder.setBigQueryServices(new BigQueryServicesImpl());
+ }
+ }
+ byte[] parseFnBytes = configRow.getBytes("parse_fn");
+ if (parseFnBytes != null) {
+ builder = builder.setParseFn((SerializableFunction)
fromByteArray(parseFnBytes));
+ }
+ byte[] datumReaderFactoryBytes =
configRow.getBytes("datum_reader_factory");
+ if (datumReaderFactoryBytes != null) {
+ builder =
+ builder.setDatumReaderFactory(
+ (SerializableFunction)
fromByteArray(datumReaderFactoryBytes));
+ }
+ byte[] queryPriorityBytes = configRow.getBytes("query_priority");
+ if (queryPriorityBytes != null) {
+ builder = builder.setQueryPriority((QueryPriority)
fromByteArray(queryPriorityBytes));
+ }
+ String queryLocation = configRow.getString("query_location");
+ if (queryLocation != null) {
+ builder = builder.setQueryLocation(queryLocation);
+ }
+ String queryTempDataset = configRow.getString("query_temp_dataset");
+ if (queryTempDataset != null) {
+ builder = builder.setQueryTempDataset(queryTempDataset);
+ }
+ byte[] methodBytes = configRow.getBytes("method");
+ if (methodBytes != null) {
+ builder = builder.setMethod((TypedRead.Method)
fromByteArray(methodBytes));
+ }
+ byte[] formatBytes = configRow.getBytes("format");
+ if (methodBytes != null) {
+ builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
+ }
+ Collection<String> selectedFields =
configRow.getArray("selected_fields");
+ if (selectedFields != null && !selectedFields.isEmpty()) {
+
builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));
+ }
+ String rowRestriction = configRow.getString("row_restriction");
+ if (rowRestriction != null) {
+ builder =
builder.setRowRestriction(StaticValueProvider.of(rowRestriction));
+ }
+ byte[] coderBytes = configRow.getBytes("coder");
+ if (coderBytes != null) {
+ try {
+ builder = builder.setCoder((Coder) fromByteArray(coderBytes));
+ } catch (InvalidClassException e) {
+ LOG.warn(
Review Comment:
it doesn't look like you actually set the default coder here
##########
sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java:
##########
@@ -203,162 +210,195 @@ public Row toConfigRow(Read<?, ?> transform) {
if (transform.getCheckStopReadingFn() != null) {
fieldValues.put("check_stop_reading_fn",
toByteArray(transform.getCheckStopReadingFn()));
}
+ if (transform.getBadRecordErrorHandler() != null) {
+ fieldValues.put(
+ "bad_record_error_handler",
toByteArray(transform.getBadRecordErrorHandler()));
+ }
return Row.withSchema(schema).withFieldValues(fieldValues).build();
}
@Override
public Read<?, ?> fromConfigRow(Row configRow) {
- Read<?, ?> transform = KafkaIO.read();
-
- Map<String, byte[]> consumerConfig = configRow.getMap("consumer_config");
- if (consumerConfig != null) {
- Map<String, Object> updatedConsumerConfig = new HashMap<>();
- consumerConfig.forEach(
- (key, dataBytes) -> {
- // Adding all allowed properties.
- if
(!KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.containsKey(key)) {
- if (consumerConfig.get(key) == null) {
- throw new IllegalArgumentException(
- "Encoded value of the consumer config property " + key +
" was null");
+ try {
+ Read<?, ?> transform = KafkaIO.read();
+
+ Map<String, byte[]> consumerConfig =
configRow.getMap("consumer_config");
+ if (consumerConfig != null) {
+ Map<String, Object> updatedConsumerConfig = new HashMap<>();
+ consumerConfig.forEach(
+ (key, dataBytes) -> {
+ // Adding all allowed properties.
+ if
(!KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.containsKey(key)) {
+ if (consumerConfig.get(key) == null) {
+ throw new IllegalArgumentException(
+ "Encoded value of the consumer config property " + key
+ " was null");
+ }
+ try {
+ updatedConsumerConfig.put(key,
fromByteArray(consumerConfig.get(key)));
+ } catch (InvalidClassException e) {
+ throw new RuntimeException(e);
+ }
}
- updatedConsumerConfig.put(key,
fromByteArray(consumerConfig.get(key)));
- }
- });
- transform = transform.withConsumerConfigUpdates(updatedConsumerConfig);
- }
- Collection<String> topics = configRow.getArray("topics");
- if (topics != null) {
- transform = transform.withTopics(new ArrayList<>(topics));
- }
- Collection<Row> topicPartitionRows =
configRow.getArray("topic_partitions");
- if (topicPartitionRows != null) {
- Collection<TopicPartition> topicPartitions =
- topicPartitionRows.stream()
- .map(
- row -> {
- String topic = row.getString("topic");
- if (topic == null) {
- throw new IllegalArgumentException("Expected the topic
to be not null");
- }
- Integer partition = row.getInt32("partition");
- if (partition == null) {
- throw new IllegalArgumentException("Expected the
partition to be not null");
- }
- return new TopicPartition(topic, partition);
- })
- .collect(Collectors.toList());
- transform =
transform.withTopicPartitions(Lists.newArrayList(topicPartitions));
- }
- String topicPattern = configRow.getString("topic_pattern");
- if (topicPattern != null) {
- transform = transform.withTopicPattern(topicPattern);
- }
+ });
+ transform =
transform.withConsumerConfigUpdates(updatedConsumerConfig);
+ }
+ Collection<String> topics = configRow.getArray("topics");
+ if (topics != null) {
+ transform = transform.withTopics(new ArrayList<>(topics));
+ }
+ Collection<Row> topicPartitionRows =
configRow.getArray("topic_partitions");
+ if (topicPartitionRows != null) {
+ Collection<TopicPartition> topicPartitions =
+ topicPartitionRows.stream()
+ .map(
+ row -> {
+ String topic = row.getString("topic");
+ if (topic == null) {
+ throw new IllegalArgumentException("Expected the
topic to be not null");
+ }
+ Integer partition = row.getInt32("partition");
+ if (partition == null) {
+ throw new IllegalArgumentException(
+ "Expected the partition to be not null");
+ }
+ return new TopicPartition(topic, partition);
+ })
+ .collect(Collectors.toList());
+ transform =
transform.withTopicPartitions(Lists.newArrayList(topicPartitions));
+ }
+ String topicPattern = configRow.getString("topic_pattern");
+ if (topicPattern != null) {
+ transform = transform.withTopicPattern(topicPattern);
+ }
- byte[] keyDeserializerProvider =
configRow.getBytes("key_deserializer_provider");
- if (keyDeserializerProvider != null) {
+ byte[] keyDeserializerProvider =
configRow.getBytes("key_deserializer_provider");
+ if (keyDeserializerProvider != null) {
+
+ byte[] keyCoder = configRow.getBytes("key_coder");
+ if (keyCoder != null) {
+ transform =
+ transform.withKeyDeserializerProviderAndCoder(
+ (DeserializerProvider)
fromByteArray(keyDeserializerProvider),
+ (org.apache.beam.sdk.coders.Coder)
fromByteArray(keyCoder));
+ } else {
+ transform =
+ transform.withKeyDeserializer(
+ (DeserializerProvider)
fromByteArray(keyDeserializerProvider));
+ }
+ }
+
+ byte[] valueDeserializerProvider =
configRow.getBytes("value_deserializer_provider");
+ if (valueDeserializerProvider != null) {
+ byte[] valueCoder = configRow.getBytes("value_coder");
+ if (valueCoder != null) {
+ transform =
+ transform.withValueDeserializerProviderAndCoder(
+ (DeserializerProvider)
fromByteArray(valueDeserializerProvider),
+ (org.apache.beam.sdk.coders.Coder)
fromByteArray(valueCoder));
+ } else {
+ transform =
+ transform.withValueDeserializer(
+ (DeserializerProvider)
fromByteArray(valueDeserializerProvider));
+ }
+ }
- byte[] keyCoder = configRow.getBytes("key_coder");
- if (keyCoder != null) {
+ byte[] consumerFactoryFn = configRow.getBytes("consumer_factory_fn");
+ if (consumerFactoryFn != null) {
+ transform =
+ transform.withConsumerFactoryFn(
+ (SerializableFunction<Map<String, Object>, Consumer<byte[],
byte[]>>)
+ fromByteArray(consumerFactoryFn));
+ }
+ byte[] watermarkFn = configRow.getBytes("watermark_fn");
+ if (watermarkFn != null) {
+ transform = transform.withWatermarkFn2((SerializableFunction)
fromByteArray(watermarkFn));
+ }
+ Long maxNumRecords = configRow.getInt64("max_num_records");
+ if (maxNumRecords != null) {
+ transform = transform.withMaxNumRecords(maxNumRecords);
+ }
+ Duration maxReadTime = configRow.getValue("max_read_time");
+ if (maxReadTime != null) {
transform =
- transform.withKeyDeserializerProviderAndCoder(
- (DeserializerProvider)
fromByteArray(keyDeserializerProvider),
- (org.apache.beam.sdk.coders.Coder) fromByteArray(keyCoder));
- } else {
+
transform.withMaxReadTime(org.joda.time.Duration.millis(maxReadTime.toMillis()));
+ }
+ Instant startReadTime = configRow.getValue("start_read_time");
+ if (startReadTime != null) {
+ transform = transform.withStartReadTime(startReadTime);
+ }
+ Instant stopReadTime = configRow.getValue("stop_read_time");
+ if (stopReadTime != null) {
+ transform = transform.withStopReadTime(stopReadTime);
+ }
+ Boolean isCommitOffsetFinalizeEnabled =
+ configRow.getBoolean("is_commit_offset_finalize_enabled");
+ if (isCommitOffsetFinalizeEnabled != null &&
isCommitOffsetFinalizeEnabled) {
+ transform = transform.commitOffsetsInFinalize();
+ }
+ Boolean isDynamicRead = configRow.getBoolean("is_dynamic_read");
+ if (isDynamicRead != null && isDynamicRead) {
+ Duration watchTopicPartitionDuration =
+ configRow.getValue("watch_topic_partition_duration");
+ if (watchTopicPartitionDuration == null) {
+ throw new IllegalArgumentException(
+ "Expected watchTopicPartitionDuration to be available when
isDynamicRead is set to true");
+ }
transform =
- transform.withKeyDeserializer(
- (DeserializerProvider)
fromByteArray(keyDeserializerProvider));
+ transform.withDynamicRead(
+
org.joda.time.Duration.millis(watchTopicPartitionDuration.toMillis()));
}
- }
- byte[] valueDeserializerProvider =
configRow.getBytes("value_deserializer_provider");
- if (valueDeserializerProvider != null) {
- byte[] valueCoder = configRow.getBytes("value_coder");
- if (valueCoder != null) {
- transform =
- transform.withValueDeserializerProviderAndCoder(
- (DeserializerProvider)
fromByteArray(valueDeserializerProvider),
- (org.apache.beam.sdk.coders.Coder)
fromByteArray(valueCoder));
- } else {
+ byte[] timestampPolicyFactory =
configRow.getBytes("timestamp_policy_factory");
+ if (timestampPolicyFactory != null) {
transform =
- transform.withValueDeserializer(
- (DeserializerProvider)
fromByteArray(valueDeserializerProvider));
+ transform.withTimestampPolicyFactory(
+ (TimestampPolicyFactory)
fromByteArray(timestampPolicyFactory));
+ }
+ Map<String, byte[]> offsetConsumerConfig =
configRow.getMap("offset_consumer_config");
+ if (offsetConsumerConfig != null) {
+ Map<String, Object> updatedOffsetConsumerConfig = new HashMap<>();
+ offsetConsumerConfig.forEach(
+ (key, dataBytes) -> {
+ if (offsetConsumerConfig.get(key) == null) {
+ throw new IllegalArgumentException(
+ "Encoded value for the offset consumer config key " +
key + " was null.");
+ }
+ try {
+ updatedOffsetConsumerConfig.put(
+ key, fromByteArray(offsetConsumerConfig.get(key)));
+ } catch (InvalidClassException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ transform =
transform.withOffsetConsumerConfigOverrides(updatedOffsetConsumerConfig);
}
- }
- byte[] consumerFactoryFn = configRow.getBytes("consumer_factory_fn");
- if (consumerFactoryFn != null) {
- transform =
- transform.withConsumerFactoryFn(
- (SerializableFunction<Map<String, Object>, Consumer<byte[],
byte[]>>)
- fromByteArray(consumerFactoryFn));
- }
- byte[] watermarkFn = configRow.getBytes("watermark_fn");
- if (watermarkFn != null) {
- transform = transform.withWatermarkFn2((SerializableFunction)
fromByteArray(watermarkFn));
- }
- Long maxNumRecords = configRow.getInt64("max_num_records");
- if (maxNumRecords != null) {
- transform = transform.withMaxNumRecords(maxNumRecords);
- }
- Duration maxReadTime = configRow.getValue("max_read_time");
- if (maxReadTime != null) {
- transform =
-
transform.withMaxReadTime(org.joda.time.Duration.millis(maxReadTime.toMillis()));
- }
- Instant startReadTime = configRow.getValue("start_read_time");
- if (startReadTime != null) {
- transform = transform.withStartReadTime(startReadTime);
- }
- Instant stopReadTime = configRow.getValue("stop_read_time");
- if (stopReadTime != null) {
- transform = transform.withStopReadTime(stopReadTime);
- }
- Boolean isCommitOffsetFinalizeEnabled =
- configRow.getBoolean("is_commit_offset_finalize_enabled");
- if (isCommitOffsetFinalizeEnabled != null &&
isCommitOffsetFinalizeEnabled) {
- transform = transform.commitOffsetsInFinalize();
- }
- Boolean isDynamicRead = configRow.getBoolean("is_dynamic_read");
- if (isDynamicRead != null && isDynamicRead) {
- Duration watchTopicPartitionDuration =
configRow.getValue("watch_topic_partition_duration");
- if (watchTopicPartitionDuration == null) {
- throw new IllegalArgumentException(
- "Expected watchTopicPartitionDuration to be available when
isDynamicRead is set to true");
+ byte[] checkStopReadinfFn =
configRow.getBytes("check_stop_reading_fn");
+ if (checkStopReadinfFn != null) {
+ transform =
+ transform.withCheckStopReadingFn(
+ (SerializableFunction<TopicPartition, Boolean>)
+ fromByteArray(checkStopReadinfFn));
}
- transform =
- transform.withDynamicRead(
-
org.joda.time.Duration.millis(watchTopicPartitionDuration.toMillis()));
- }
- byte[] timestampPolicyFactory =
configRow.getBytes("timestamp_policy_factory");
- if (timestampPolicyFactory != null) {
- transform =
- transform.withTimestampPolicyFactory(
- (TimestampPolicyFactory)
fromByteArray(timestampPolicyFactory));
- }
- Map<String, byte[]> offsetConsumerConfig =
configRow.getMap("offset_consumer_config");
- if (offsetConsumerConfig != null) {
- Map<String, Object> updatedOffsetConsumerConfig = new HashMap<>();
- offsetConsumerConfig.forEach(
- (key, dataBytes) -> {
- if (offsetConsumerConfig.get(key) == null) {
- throw new IllegalArgumentException(
- "Encoded value for the offset consumer config key " + key
+ " was null.");
- }
- updatedOffsetConsumerConfig.put(key,
fromByteArray(offsetConsumerConfig.get(key)));
- });
- transform =
transform.withOffsetConsumerConfigOverrides(updatedOffsetConsumerConfig);
- }
+ byte[] badRecordErrorHandlerBytes =
configRow.getBytes("bad_record_error_handler");
+ if (badRecordErrorHandlerBytes != null) {
+ try {
+ transform =
+ transform.withBadRecordErrorHandler(
+ (ErrorHandler) fromByteArray(badRecordErrorHandlerBytes));
+ } catch (InvalidClassException e) {
+ LOG.warn(
+ "Could not use the provided `ErrorHandler` implementation when
upgrading."
+ + "Using the default.");
+ }
Review Comment:
the transform behavior changes whether or not an error handler is passed at
all, so this could break if deserialization fails.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]