This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4264c2c3e25 Fixes breakages of the upgrade feature (#29731)
4264c2c3e25 is described below

commit 4264c2c3e2586eb0a6175d71093509408194de06
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Wed Dec 13 20:11:29 2023 -0800

    Fixes breakages of the upgrade feature (#29731)
    
    * Fixes breakages of the upgrade feature
    
    * Fix spotless
    
    * Addressing reviewer comments
    
    * Removing unused import
    
    * Reverting the PreCommit update
---
 .../core/construction/TransformUpgrader.java       |  13 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 705 +++++++++++----------
 sdks/java/io/kafka/upgrade/build.gradle            |   2 -
 .../sdk/io/kafka/upgrade/KafkaIOTranslation.java   | 414 ++++++------
 .../io/kafka/upgrade/KafkaIOTranslationTest.java   |  11 +
 5 files changed, 624 insertions(+), 521 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java
index e6dce752d06..b142ab4af1c 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InvalidClassException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.net.ServerSocket;
@@ -49,12 +50,16 @@ import 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannelBuilder;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A utility class that allows upgrading transforms of a given pipeline using 
the Beam Transform
  * Service.
  */
 public class TransformUpgrader implements AutoCloseable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TransformUpgrader.class);
   private static final String UPGRADE_NAMESPACE = "transform:upgrade:";
 
   private ExpansionServiceClientFactory clientFactory;
@@ -405,10 +410,16 @@ public class TransformUpgrader implements AutoCloseable {
    *     method.
    * @return re-generated object.
    */
-  public static Object fromByteArray(byte[] bytes) {
+  public static Object fromByteArray(byte[] bytes) throws 
InvalidClassException {
     try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
         ObjectInputStream in = new ObjectInputStream(bis)) {
       return in.readObject();
+    } catch (InvalidClassException e) {
+      LOG.info(
+          "An object cannot be re-generated from the provided byte array. 
Caller may use the "
+              + "default value for the parameter when upgrading. Underlying 
error: "
+              + e);
+      throw e;
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
index a0c3bdbece5..1056328eb4c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
@@ -26,6 +26,7 @@ import com.google.auto.service.AutoService;
 import 
com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
 import com.google.cloud.bigquery.storage.v1.DataFormat;
 import java.io.IOException;
+import java.io.InvalidClassException;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
@@ -63,10 +64,14 @@ import 
org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"rawtypes", "nullness"})
 public class BigQueryIOTranslation {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(BigQueryIOTranslation.class);
+
   static class BigQueryIOReadTranslator implements 
TransformPayloadTranslator<TypedRead<?>> {
 
     static Schema schema =
@@ -184,105 +189,124 @@ public class BigQueryIOTranslation {
 
     @Override
     public TypedRead<?> fromConfigRow(Row configRow) {
-      BigQueryIO.TypedRead.Builder builder = new 
AutoValue_BigQueryIO_TypedRead.Builder<>();
+      try {
+        BigQueryIO.TypedRead.Builder builder = new 
AutoValue_BigQueryIO_TypedRead.Builder<>();
 
-      String jsonTableRef = configRow.getString("json_table_ref");
-      if (jsonTableRef != null) {
-        builder = 
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
-      }
-      String query = configRow.getString("query");
-      if (query != null) {
-        builder = builder.setQuery(StaticValueProvider.of(query));
-      }
-      Boolean validate = configRow.getBoolean("validate");
-      if (validate != null) {
-        builder = builder.setValidate(validate);
-      }
-      Boolean flattenResults = configRow.getBoolean("flatten_results");
-      if (flattenResults != null) {
-        builder = builder.setFlattenResults(flattenResults);
-      }
-      Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
-      if (useLegacySQL != null) {
-        builder = builder.setUseLegacySql(useLegacySQL);
-      }
-      Boolean withTemplateCompatibility = 
configRow.getBoolean("with_template_compatibility");
-      if (withTemplateCompatibility != null) {
-        builder = 
builder.setWithTemplateCompatibility(withTemplateCompatibility);
-      }
-      byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
-      if (bigqueryServicesBytes != null) {
-        builder =
-            builder.setBigQueryServices((BigQueryServices) 
fromByteArray(bigqueryServicesBytes));
-      }
-      byte[] parseFnBytes = configRow.getBytes("parse_fn");
-      if (parseFnBytes != null) {
-        builder = builder.setParseFn((SerializableFunction) 
fromByteArray(parseFnBytes));
-      }
-      byte[] datumReaderFactoryBytes = 
configRow.getBytes("datum_reader_factory");
-      if (datumReaderFactoryBytes != null) {
-        builder =
-            builder.setDatumReaderFactory(
-                (SerializableFunction) fromByteArray(datumReaderFactoryBytes));
-      }
-      byte[] queryPriorityBytes = configRow.getBytes("query_priority");
-      if (queryPriorityBytes != null) {
-        builder = builder.setQueryPriority((QueryPriority) 
fromByteArray(queryPriorityBytes));
-      }
-      String queryLocation = configRow.getString("query_location");
-      if (queryLocation != null) {
-        builder = builder.setQueryLocation(queryLocation);
-      }
-      String queryTempDataset = configRow.getString("query_temp_dataset");
-      if (queryTempDataset != null) {
-        builder = builder.setQueryTempDataset(queryTempDataset);
-      }
-      byte[] methodBytes = configRow.getBytes("method");
-      if (methodBytes != null) {
-        builder = builder.setMethod((TypedRead.Method) 
fromByteArray(methodBytes));
-      }
-      byte[] formatBytes = configRow.getBytes("format");
-      if (methodBytes != null) {
-        builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
-      }
-      Collection<String> selectedFields = 
configRow.getArray("selected_fields");
-      if (selectedFields != null && !selectedFields.isEmpty()) {
-        
builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));
-      }
-      String rowRestriction = configRow.getString("row_restriction");
-      if (rowRestriction != null) {
-        builder = 
builder.setRowRestriction(StaticValueProvider.of(rowRestriction));
-      }
-      byte[] coderBytes = configRow.getBytes("coder");
-      if (coderBytes != null) {
-        builder = builder.setCoder((Coder) fromByteArray(coderBytes));
-      }
-      String kmsKey = configRow.getString("kms_key");
-      if (kmsKey != null) {
-        builder = builder.setKmsKey(kmsKey);
-      }
-      byte[] typeDescriptorBytes = configRow.getBytes("type_descriptor");
-      if (typeDescriptorBytes != null) {
-        builder = builder.setTypeDescriptor((TypeDescriptor) 
fromByteArray(typeDescriptorBytes));
-      }
-      byte[] toBeamRowFnBytes = configRow.getBytes("to_beam_row_fn");
-      if (toBeamRowFnBytes != null) {
-        builder = builder.setToBeamRowFn((ToBeamRowFunction) 
fromByteArray(toBeamRowFnBytes));
-      }
-      byte[] fromBeamRowFnBytes = configRow.getBytes("from_beam_row_fn");
-      if (fromBeamRowFnBytes != null) {
-        builder = builder.setFromBeamRowFn((FromBeamRowFunction) 
fromByteArray(fromBeamRowFnBytes));
-      }
-      Boolean useAvroLogicalTypes = 
configRow.getBoolean("use_avro_logical_types");
-      if (useAvroLogicalTypes != null) {
-        builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
-      }
-      Boolean projectionPushdownApplied = 
configRow.getBoolean("projection_pushdown_applied");
-      if (projectionPushdownApplied != null) {
-        builder = 
builder.setProjectionPushdownApplied(projectionPushdownApplied);
-      }
+        String jsonTableRef = configRow.getString("json_table_ref");
+        if (jsonTableRef != null) {
+          builder = 
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
+        }
+        String query = configRow.getString("query");
+        if (query != null) {
+          builder = builder.setQuery(StaticValueProvider.of(query));
+        }
+        Boolean validate = configRow.getBoolean("validate");
+        if (validate != null) {
+          builder = builder.setValidate(validate);
+        }
+        Boolean flattenResults = configRow.getBoolean("flatten_results");
+        if (flattenResults != null) {
+          builder = builder.setFlattenResults(flattenResults);
+        }
+        Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
+        if (useLegacySQL != null) {
+          builder = builder.setUseLegacySql(useLegacySQL);
+        }
+        Boolean withTemplateCompatibility = 
configRow.getBoolean("with_template_compatibility");
+        if (withTemplateCompatibility != null) {
+          builder = 
builder.setWithTemplateCompatibility(withTemplateCompatibility);
+        }
+        byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
+        if (bigqueryServicesBytes != null) {
+          try {
+            builder =
+                builder.setBigQueryServices(
+                    (BigQueryServices) fromByteArray(bigqueryServicesBytes));
+          } catch (InvalidClassException e) {
+            LOG.warn(
+                "Could not use the provided `BigQueryServices` implementation 
when upgrading."
+                    + "Using the default.");
+            builder.setBigQueryServices(new BigQueryServicesImpl());
+          }
+        }
+        byte[] parseFnBytes = configRow.getBytes("parse_fn");
+        if (parseFnBytes != null) {
+          builder = builder.setParseFn((SerializableFunction) 
fromByteArray(parseFnBytes));
+        }
+        byte[] datumReaderFactoryBytes = 
configRow.getBytes("datum_reader_factory");
+        if (datumReaderFactoryBytes != null) {
+          builder =
+              builder.setDatumReaderFactory(
+                  (SerializableFunction) 
fromByteArray(datumReaderFactoryBytes));
+        }
+        byte[] queryPriorityBytes = configRow.getBytes("query_priority");
+        if (queryPriorityBytes != null) {
+          builder = builder.setQueryPriority((QueryPriority) 
fromByteArray(queryPriorityBytes));
+        }
+        String queryLocation = configRow.getString("query_location");
+        if (queryLocation != null) {
+          builder = builder.setQueryLocation(queryLocation);
+        }
+        String queryTempDataset = configRow.getString("query_temp_dataset");
+        if (queryTempDataset != null) {
+          builder = builder.setQueryTempDataset(queryTempDataset);
+        }
+        byte[] methodBytes = configRow.getBytes("method");
+        if (methodBytes != null) {
+          builder = builder.setMethod((TypedRead.Method) 
fromByteArray(methodBytes));
+        }
+        byte[] formatBytes = configRow.getBytes("format");
+        if (methodBytes != null) {
+          builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
+        }
+        Collection<String> selectedFields = 
configRow.getArray("selected_fields");
+        if (selectedFields != null && !selectedFields.isEmpty()) {
+          
builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));
+        }
+        String rowRestriction = configRow.getString("row_restriction");
+        if (rowRestriction != null) {
+          builder = 
builder.setRowRestriction(StaticValueProvider.of(rowRestriction));
+        }
+        byte[] coderBytes = configRow.getBytes("coder");
+        if (coderBytes != null) {
+          try {
+            builder = builder.setCoder((Coder) fromByteArray(coderBytes));
+          } catch (InvalidClassException e) {
+            LOG.warn(
+                "Could not use the provided `Coder` implementation when 
upgrading."
+                    + "Using the default.");
+          }
+        }
+        String kmsKey = configRow.getString("kms_key");
+        if (kmsKey != null) {
+          builder = builder.setKmsKey(kmsKey);
+        }
+        byte[] typeDescriptorBytes = configRow.getBytes("type_descriptor");
+        if (typeDescriptorBytes != null) {
+          builder = builder.setTypeDescriptor((TypeDescriptor) 
fromByteArray(typeDescriptorBytes));
+        }
+        byte[] toBeamRowFnBytes = configRow.getBytes("to_beam_row_fn");
+        if (toBeamRowFnBytes != null) {
+          builder = builder.setToBeamRowFn((ToBeamRowFunction) 
fromByteArray(toBeamRowFnBytes));
+        }
+        byte[] fromBeamRowFnBytes = configRow.getBytes("from_beam_row_fn");
+        if (fromBeamRowFnBytes != null) {
+          builder =
+              builder.setFromBeamRowFn((FromBeamRowFunction) 
fromByteArray(fromBeamRowFnBytes));
+        }
+        Boolean useAvroLogicalTypes = 
configRow.getBoolean("use_avro_logical_types");
+        if (useAvroLogicalTypes != null) {
+          builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
+        }
+        Boolean projectionPushdownApplied = 
configRow.getBoolean("projection_pushdown_applied");
+        if (projectionPushdownApplied != null) {
+          builder = 
builder.setProjectionPushdownApplied(projectionPushdownApplied);
+        }
 
-      return builder.build();
+        return builder.build();
+      } catch (InvalidClassException e) {
+        throw new RuntimeException(e);
+      }
     }
   }
 
@@ -529,239 +553,260 @@ public class BigQueryIOTranslation {
 
     @Override
     public Write<?> fromConfigRow(Row configRow) {
-      BigQueryIO.Write.Builder builder = new 
AutoValue_BigQueryIO_Write.Builder<>();
-
-      String jsonTableRef = configRow.getString("json_table_ref");
-      if (jsonTableRef != null) {
-        builder = 
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
-      }
-      byte[] tableFunctionBytes = configRow.getBytes("table_function");
-      if (tableFunctionBytes != null) {
-        builder =
-            builder.setTableFunction(
-                (SerializableFunction<ValueInSingleWindow, TableDestination>)
-                    fromByteArray(tableFunctionBytes));
-      }
-      byte[] formatFunctionBytes = configRow.getBytes("format_function");
-      if (formatFunctionBytes != null) {
-        builder =
-            builder.setFormatFunction(
-                (SerializableFunction<?, TableRow>) 
fromByteArray(formatFunctionBytes));
-      }
-      byte[] formatRecordOnFailureFunctionBytes =
-          configRow.getBytes("format_record_on_failure_function");
-      if (tableFunctionBytes != null) {
-        builder =
-            builder.setFormatRecordOnFailureFunction(
-                (SerializableFunction<?, TableRow>)
-                    fromByteArray(formatRecordOnFailureFunctionBytes));
-      }
-      byte[] avroRowWriterFactoryBytes = 
configRow.getBytes("avro_row_writer_factory");
-      if (avroRowWriterFactoryBytes != null) {
-        builder =
-            builder.setAvroRowWriterFactory(
-                (AvroRowWriterFactory) 
fromByteArray(avroRowWriterFactoryBytes));
-      }
-      byte[] avroSchemaFactoryBytes = 
configRow.getBytes("avro_schema_factory");
-      if (tableFunctionBytes != null) {
-        builder =
-            builder.setAvroSchemaFactory(
-                (SerializableFunction) fromByteArray(avroSchemaFactoryBytes));
-      }
-      Boolean useAvroLogicalTypes = 
configRow.getBoolean("use_avro_logical_types");
-      if (useAvroLogicalTypes != null) {
-        builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
-      }
-      byte[] dynamicDestinationsBytes = 
configRow.getBytes("dynamic_destinations");
-      if (dynamicDestinationsBytes != null) {
-        builder =
-            builder.setDynamicDestinations(
-                (DynamicDestinations) fromByteArray(dynamicDestinationsBytes));
-      }
-      String jsonSchema = configRow.getString("json_schema");
-      if (jsonSchema != null) {
-        builder = builder.setJsonSchema(StaticValueProvider.of(jsonSchema));
-      }
-      String jsonTimePartitioning = 
configRow.getString("json_time_partitioning");
-      if (jsonTimePartitioning != null) {
-        builder = 
builder.setJsonTimePartitioning(StaticValueProvider.of(jsonTimePartitioning));
-      }
-      byte[] clusteringBytes = configRow.getBytes("clustering");
-      if (clusteringBytes != null) {
-        builder = builder.setClustering((Clustering) 
fromByteArray(clusteringBytes));
-      }
-      byte[] createDispositionBytes = configRow.getBytes("create_disposition");
-      if (createDispositionBytes != null) {
-        builder =
-            builder.setCreateDisposition((CreateDisposition) 
fromByteArray(createDispositionBytes));
-      }
-      byte[] writeDispositionBytes = configRow.getBytes("write_disposition");
-      if (writeDispositionBytes != null) {
-        builder =
-            builder.setWriteDisposition((WriteDisposition) 
fromByteArray(writeDispositionBytes));
-      }
-      Collection<byte[]> schemaUpdateOptionsData = 
configRow.getArray("schema_update_options");
-      if (schemaUpdateOptionsData != null) {
-        Set<SchemaUpdateOption> schemaUpdateOptions =
-            schemaUpdateOptionsData.stream()
-                .map(data -> (SchemaUpdateOption) fromByteArray(data))
-                .collect(Collectors.toSet());
-        builder = builder.setSchemaUpdateOptions(schemaUpdateOptions);
-      } else {
-        // This property is not nullable.
-        builder = builder.setSchemaUpdateOptions(Collections.emptySet());
-      }
-      String tableDescription = configRow.getString("table_description");
-      if (tableDescription != null) {
-        builder = builder.setTableDescription(tableDescription);
-      }
-      Boolean validate = configRow.getBoolean("validate");
-      if (validate != null) {
-        builder = builder.setValidate(validate);
-      }
-      byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
-      if (bigqueryServicesBytes != null) {
-        builder =
-            builder.setBigQueryServices((BigQueryServices) 
fromByteArray(bigqueryServicesBytes));
-      }
-      Integer maxFilesPerBundle = configRow.getInt32("max_files_per_bundle");
-      if (maxFilesPerBundle != null) {
-        builder = builder.setMaxFilesPerBundle(maxFilesPerBundle);
-      }
-      Long maxFileSize = configRow.getInt64("max_file_size");
-      if (maxFileSize != null) {
-        builder = builder.setMaxFileSize(maxFileSize);
-      }
-      Integer numFileShards = configRow.getInt32("num_file_shards");
-      if (numFileShards != null) {
-        builder = builder.setNumFileShards(numFileShards);
-      }
-      Integer numStorageWriteApiStreams = 
configRow.getInt32("num_storage_write_api_streams");
-      if (numStorageWriteApiStreams != null) {
-        builder = 
builder.setNumStorageWriteApiStreams(numStorageWriteApiStreams);
-      }
-      Boolean propagateSuccessfulStorageApiWrites =
-          configRow.getBoolean("propagate_successful_storage_api_writes");
-      if (propagateSuccessfulStorageApiWrites != null) {
-        builder =
-            
builder.setPropagateSuccessfulStorageApiWrites(propagateSuccessfulStorageApiWrites);
-      }
-      Integer maxFilesPerPartition = 
configRow.getInt32("max_files_per_partition");
-      if (maxFilesPerPartition != null) {
-        builder = builder.setMaxFilesPerPartition(maxFilesPerPartition);
-      }
-      Long maxBytesPerPartition = 
configRow.getInt64("max_bytes_per_partition");
-      if (maxBytesPerPartition != null) {
-        builder = builder.setMaxBytesPerPartition(maxBytesPerPartition);
-      }
-      Duration triggerringFrequency = 
configRow.getValue("triggerring_frequency");
-      if (triggerringFrequency != null) {
-        builder =
-            builder.setTriggeringFrequency(
-                
org.joda.time.Duration.millis(triggerringFrequency.toMillis()));
-      }
-      byte[] methodBytes = configRow.getBytes("method");
-      if (methodBytes != null) {
-        builder = builder.setMethod((Write.Method) fromByteArray(methodBytes));
-      }
-      String loadJobProjectId = configRow.getString("load_job_project_id");
-      if (loadJobProjectId != null) {
-        builder = 
builder.setLoadJobProjectId(StaticValueProvider.of(loadJobProjectId));
-      }
-      byte[] failedInsertRetryPolicyBytes = 
configRow.getBytes("failed_insert_retry_policy");
-      if (failedInsertRetryPolicyBytes != null) {
-        builder =
-            builder.setFailedInsertRetryPolicy(
-                (InsertRetryPolicy) 
fromByteArray(failedInsertRetryPolicyBytes));
-      }
-      String customGcsTempLocations = 
configRow.getString("custom_gcs_temp_location");
-      if (customGcsTempLocations != null) {
-        builder = 
builder.setCustomGcsTempLocation(StaticValueProvider.of(customGcsTempLocations));
-      }
-      Boolean extendedErrorInfo = configRow.getBoolean("extended_error_info");
-      if (extendedErrorInfo != null) {
-        builder = builder.setExtendedErrorInfo(extendedErrorInfo);
-      }
-      Boolean skipInvalidRows = configRow.getBoolean("skip_invalid_rows");
-      if (skipInvalidRows != null) {
-        builder = builder.setSkipInvalidRows(skipInvalidRows);
-      }
-      Boolean ignoreUnknownValues = 
configRow.getBoolean("ignore_unknown_values");
-      if (ignoreUnknownValues != null) {
-        builder = builder.setIgnoreUnknownValues(ignoreUnknownValues);
-      }
-      Boolean ignoreInsertIds = configRow.getBoolean("ignore_insert_ids");
-      if (ignoreInsertIds != null) {
-        builder = builder.setIgnoreInsertIds(ignoreInsertIds);
-      }
-      Integer maxRetryJobs = configRow.getInt32("max_retry_jobs");
-      if (maxRetryJobs != null) {
-        builder = builder.setMaxRetryJobs(maxRetryJobs);
-      }
-      String kmsKey = configRow.getString("kms_key");
-      if (kmsKey != null) {
-        builder = builder.setKmsKey(kmsKey);
-      }
-      Collection<String> primaryKey = configRow.getArray("primary_key");
-      if (primaryKey != null && !primaryKey.isEmpty()) {
-        builder = builder.setPrimaryKey(ImmutableList.of(primaryKey));
-      }
-      byte[] defaultMissingValueInterpretationsBytes =
-          configRow.getBytes("default_missing_value_interpretation");
-      if (defaultMissingValueInterpretationsBytes != null) {
-        builder =
-            builder.setDefaultMissingValueInterpretation(
-                (MissingValueInterpretation)
-                    fromByteArray(defaultMissingValueInterpretationsBytes));
-      }
-      Boolean optimizeWrites = configRow.getBoolean("optimize_writes");
-      if (optimizeWrites != null) {
-        builder = builder.setOptimizeWrites(optimizeWrites);
-      }
-      Boolean useBeamSchema = configRow.getBoolean("use_beam_schema");
-      if (useBeamSchema != null) {
-        builder = builder.setUseBeamSchema(useBeamSchema);
-      }
-      Boolean autoSharding = configRow.getBoolean("auto_sharding");
-      if (autoSharding != null) {
-        builder = builder.setAutoSharding(autoSharding);
-      }
-      Boolean propagateSuccessful = 
configRow.getBoolean("propagate_successful");
-      if (propagateSuccessful != null) {
-        builder = builder.setPropagateSuccessful(propagateSuccessful);
-      }
-      Boolean autoSchemaUpdate = configRow.getBoolean("auto_schema_update");
-      if (autoSchemaUpdate != null) {
-        builder = builder.setAutoSchemaUpdate(autoSchemaUpdate);
-      }
-      byte[] writeProtosClasses = configRow.getBytes("write_protos_class");
-      if (writeProtosClasses != null) {
-        builder =
-            builder.setWriteProtosClass(
-                (Class) 
fromByteArray(defaultMissingValueInterpretationsBytes));
-      }
-      Boolean directWriteProtos = configRow.getBoolean("direct_write_protos");
-      if (directWriteProtos != null) {
-        builder = builder.setDirectWriteProtos(directWriteProtos);
-      }
-      byte[] deterministicRecordIdFnBytes = 
configRow.getBytes("deterministic_record_id_fn");
-      if (deterministicRecordIdFnBytes != null) {
-        builder =
-            builder.setDeterministicRecordIdFn(
-                (SerializableFunction) 
fromByteArray(deterministicRecordIdFnBytes));
-      }
-      String writeTempDataset = configRow.getString("write_temp_dataset");
-      if (writeTempDataset != null) {
-        builder = builder.setWriteTempDataset(writeTempDataset);
-      }
-      byte[] rowMutationInformationFnBytes = 
configRow.getBytes("row_mutation_information_fn");
-      if (rowMutationInformationFnBytes != null) {
-        builder =
-            builder.setRowMutationInformationFn(
-                (SerializableFunction) 
fromByteArray(rowMutationInformationFnBytes));
-      }
-
-      return builder.build();
+      try {
+        BigQueryIO.Write.Builder builder = new 
AutoValue_BigQueryIO_Write.Builder<>();
+
+        String jsonTableRef = configRow.getString("json_table_ref");
+        if (jsonTableRef != null) {
+          builder = 
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
+        }
+        byte[] tableFunctionBytes = configRow.getBytes("table_function");
+        if (tableFunctionBytes != null) {
+          builder =
+              builder.setTableFunction(
+                  (SerializableFunction<ValueInSingleWindow, TableDestination>)
+                      fromByteArray(tableFunctionBytes));
+        }
+        byte[] formatFunctionBytes = configRow.getBytes("format_function");
+        if (formatFunctionBytes != null) {
+          builder =
+              builder.setFormatFunction(
+                  (SerializableFunction<?, TableRow>) 
fromByteArray(formatFunctionBytes));
+        }
+        byte[] formatRecordOnFailureFunctionBytes =
+            configRow.getBytes("format_record_on_failure_function");
+        if (tableFunctionBytes != null) {
+          builder =
+              builder.setFormatRecordOnFailureFunction(
+                  (SerializableFunction<?, TableRow>)
+                      fromByteArray(formatRecordOnFailureFunctionBytes));
+        }
+        byte[] avroRowWriterFactoryBytes = 
configRow.getBytes("avro_row_writer_factory");
+        if (avroRowWriterFactoryBytes != null) {
+          builder =
+              builder.setAvroRowWriterFactory(
+                  (AvroRowWriterFactory) 
fromByteArray(avroRowWriterFactoryBytes));
+        }
+        byte[] avroSchemaFactoryBytes = 
configRow.getBytes("avro_schema_factory");
+        if (tableFunctionBytes != null) {
+          builder =
+              builder.setAvroSchemaFactory(
+                  (SerializableFunction) 
fromByteArray(avroSchemaFactoryBytes));
+        }
+        Boolean useAvroLogicalTypes = 
configRow.getBoolean("use_avro_logical_types");
+        if (useAvroLogicalTypes != null) {
+          builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
+        }
+        byte[] dynamicDestinationsBytes = 
configRow.getBytes("dynamic_destinations");
+        if (dynamicDestinationsBytes != null) {
+          builder =
+              builder.setDynamicDestinations(
+                  (DynamicDestinations) 
fromByteArray(dynamicDestinationsBytes));
+        }
+        String jsonSchema = configRow.getString("json_schema");
+        if (jsonSchema != null) {
+          builder = builder.setJsonSchema(StaticValueProvider.of(jsonSchema));
+        }
+        String jsonTimePartitioning = 
configRow.getString("json_time_partitioning");
+        if (jsonTimePartitioning != null) {
+          builder = 
builder.setJsonTimePartitioning(StaticValueProvider.of(jsonTimePartitioning));
+        }
+        byte[] clusteringBytes = configRow.getBytes("clustering");
+        if (clusteringBytes != null) {
+          builder = builder.setClustering((Clustering) 
fromByteArray(clusteringBytes));
+        }
+        byte[] createDispositionBytes = 
configRow.getBytes("create_disposition");
+        if (createDispositionBytes != null) {
+          builder =
+              builder.setCreateDisposition(
+                  (CreateDisposition) fromByteArray(createDispositionBytes));
+        }
+        byte[] writeDispositionBytes = configRow.getBytes("write_disposition");
+        if (writeDispositionBytes != null) {
+          builder =
+              builder.setWriteDisposition((WriteDisposition) 
fromByteArray(writeDispositionBytes));
+        }
+        Collection<byte[]> schemaUpdateOptionsData = 
configRow.getArray("schema_update_options");
+        if (schemaUpdateOptionsData != null) {
+          Set<SchemaUpdateOption> schemaUpdateOptions =
+              schemaUpdateOptionsData.stream()
+                  .map(
+                      data -> {
+                        try {
+                          return (SchemaUpdateOption) fromByteArray(data);
+                        } catch (InvalidClassException e) {
+                          throw new RuntimeException(e);
+                        }
+                      })
+                  .collect(Collectors.toSet());
+          builder = builder.setSchemaUpdateOptions(schemaUpdateOptions);
+        } else {
+          // This property is not nullable.
+          builder = builder.setSchemaUpdateOptions(Collections.emptySet());
+        }
+        String tableDescription = configRow.getString("table_description");
+        if (tableDescription != null) {
+          builder = builder.setTableDescription(tableDescription);
+        }
+        Boolean validate = configRow.getBoolean("validate");
+        if (validate != null) {
+          builder = builder.setValidate(validate);
+        }
+        byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
+        if (bigqueryServicesBytes != null) {
+          try {
+            builder =
+                builder.setBigQueryServices(
+                    (BigQueryServices) fromByteArray(bigqueryServicesBytes));
+          } catch (InvalidClassException e) {
+            LOG.warn(
+                "Could not use the provided `BigQueryServices` implementation 
when upgrading."
+                    + "Using the default.");
+            builder.setBigQueryServices(new BigQueryServicesImpl());
+          }
+        }
+        Integer maxFilesPerBundle = configRow.getInt32("max_files_per_bundle");
+        if (maxFilesPerBundle != null) {
+          builder = builder.setMaxFilesPerBundle(maxFilesPerBundle);
+        }
+        Long maxFileSize = configRow.getInt64("max_file_size");
+        if (maxFileSize != null) {
+          builder = builder.setMaxFileSize(maxFileSize);
+        }
+        Integer numFileShards = configRow.getInt32("num_file_shards");
+        if (numFileShards != null) {
+          builder = builder.setNumFileShards(numFileShards);
+        }
+        Integer numStorageWriteApiStreams = 
configRow.getInt32("num_storage_write_api_streams");
+        if (numStorageWriteApiStreams != null) {
+          builder = 
builder.setNumStorageWriteApiStreams(numStorageWriteApiStreams);
+        }
+        Boolean propagateSuccessfulStorageApiWrites =
+            configRow.getBoolean("propagate_successful_storage_api_writes");
+        if (propagateSuccessfulStorageApiWrites != null) {
+          builder =
+              
builder.setPropagateSuccessfulStorageApiWrites(propagateSuccessfulStorageApiWrites);
+        }
+        Integer maxFilesPerPartition = 
configRow.getInt32("max_files_per_partition");
+        if (maxFilesPerPartition != null) {
+          builder = builder.setMaxFilesPerPartition(maxFilesPerPartition);
+        }
+        Long maxBytesPerPartition = 
configRow.getInt64("max_bytes_per_partition");
+        if (maxBytesPerPartition != null) {
+          builder = builder.setMaxBytesPerPartition(maxBytesPerPartition);
+        }
+        Duration triggerringFrequency = 
configRow.getValue("triggerring_frequency");
+        if (triggerringFrequency != null) {
+          builder =
+              builder.setTriggeringFrequency(
+                  
org.joda.time.Duration.millis(triggerringFrequency.toMillis()));
+        }
+        byte[] methodBytes = configRow.getBytes("method");
+        if (methodBytes != null) {
+          builder = builder.setMethod((Write.Method) 
fromByteArray(methodBytes));
+        }
+        String loadJobProjectId = configRow.getString("load_job_project_id");
+        if (loadJobProjectId != null) {
+          builder = 
builder.setLoadJobProjectId(StaticValueProvider.of(loadJobProjectId));
+        }
+        byte[] failedInsertRetryPolicyBytes = 
configRow.getBytes("failed_insert_retry_policy");
+        if (failedInsertRetryPolicyBytes != null) {
+          builder =
+              builder.setFailedInsertRetryPolicy(
+                  (InsertRetryPolicy) 
fromByteArray(failedInsertRetryPolicyBytes));
+        }
+        String customGcsTempLocations = 
configRow.getString("custom_gcs_temp_location");
+        if (customGcsTempLocations != null) {
+          builder =
+              
builder.setCustomGcsTempLocation(StaticValueProvider.of(customGcsTempLocations));
+        }
+        Boolean extendedErrorInfo = 
configRow.getBoolean("extended_error_info");
+        if (extendedErrorInfo != null) {
+          builder = builder.setExtendedErrorInfo(extendedErrorInfo);
+        }
+        Boolean skipInvalidRows = configRow.getBoolean("skip_invalid_rows");
+        if (skipInvalidRows != null) {
+          builder = builder.setSkipInvalidRows(skipInvalidRows);
+        }
+        Boolean ignoreUnknownValues = 
configRow.getBoolean("ignore_unknown_values");
+        if (ignoreUnknownValues != null) {
+          builder = builder.setIgnoreUnknownValues(ignoreUnknownValues);
+        }
+        Boolean ignoreInsertIds = configRow.getBoolean("ignore_insert_ids");
+        if (ignoreInsertIds != null) {
+          builder = builder.setIgnoreInsertIds(ignoreInsertIds);
+        }
+        Integer maxRetryJobs = configRow.getInt32("max_retry_jobs");
+        if (maxRetryJobs != null) {
+          builder = builder.setMaxRetryJobs(maxRetryJobs);
+        }
+        String kmsKey = configRow.getString("kms_key");
+        if (kmsKey != null) {
+          builder = builder.setKmsKey(kmsKey);
+        }
+        Collection<String> primaryKey = configRow.getArray("primary_key");
+        if (primaryKey != null && !primaryKey.isEmpty()) {
+          builder = builder.setPrimaryKey(ImmutableList.of(primaryKey));
+        }
+        byte[] defaultMissingValueInterpretationsBytes =
+            configRow.getBytes("default_missing_value_interpretation");
+        if (defaultMissingValueInterpretationsBytes != null) {
+          builder =
+              builder.setDefaultMissingValueInterpretation(
+                  (MissingValueInterpretation)
+                      fromByteArray(defaultMissingValueInterpretationsBytes));
+        }
+        Boolean optimizeWrites = configRow.getBoolean("optimize_writes");
+        if (optimizeWrites != null) {
+          builder = builder.setOptimizeWrites(optimizeWrites);
+        }
+        Boolean useBeamSchema = configRow.getBoolean("use_beam_schema");
+        if (useBeamSchema != null) {
+          builder = builder.setUseBeamSchema(useBeamSchema);
+        }
+        Boolean autoSharding = configRow.getBoolean("auto_sharding");
+        if (autoSharding != null) {
+          builder = builder.setAutoSharding(autoSharding);
+        }
+        Boolean propagateSuccessful = 
configRow.getBoolean("propagate_successful");
+        if (propagateSuccessful != null) {
+          builder = builder.setPropagateSuccessful(propagateSuccessful);
+        }
+        Boolean autoSchemaUpdate = configRow.getBoolean("auto_schema_update");
+        if (autoSchemaUpdate != null) {
+          builder = builder.setAutoSchemaUpdate(autoSchemaUpdate);
+        }
+        byte[] writeProtosClasses = configRow.getBytes("write_protos_class");
+        if (writeProtosClasses != null) {
+          builder =
+              builder.setWriteProtosClass(
+                  (Class) 
fromByteArray(defaultMissingValueInterpretationsBytes));
+        }
+        Boolean directWriteProtos = 
configRow.getBoolean("direct_write_protos");
+        if (directWriteProtos != null) {
+          builder = builder.setDirectWriteProtos(directWriteProtos);
+        }
+        byte[] deterministicRecordIdFnBytes = 
configRow.getBytes("deterministic_record_id_fn");
+        if (deterministicRecordIdFnBytes != null) {
+          builder =
+              builder.setDeterministicRecordIdFn(
+                  (SerializableFunction) 
fromByteArray(deterministicRecordIdFnBytes));
+        }
+        String writeTempDataset = configRow.getString("write_temp_dataset");
+        if (writeTempDataset != null) {
+          builder = builder.setWriteTempDataset(writeTempDataset);
+        }
+        byte[] rowMutationInformationFnBytes = 
configRow.getBytes("row_mutation_information_fn");
+        if (rowMutationInformationFnBytes != null) {
+          builder =
+              builder.setRowMutationInformationFn(
+                  (SerializableFunction) 
fromByteArray(rowMutationInformationFnBytes));
+        }
+
+        return builder.build();
+      } catch (InvalidClassException e) {
+        throw new RuntimeException(e);
+      }
     }
   }
 
diff --git a/sdks/java/io/kafka/upgrade/build.gradle 
b/sdks/java/io/kafka/upgrade/build.gradle
index 78776e6e826..ca2823740dc 100644
--- a/sdks/java/io/kafka/upgrade/build.gradle
+++ b/sdks/java/io/kafka/upgrade/build.gradle
@@ -16,8 +16,6 @@
  * limitations under the License.
  */
 
-import java.util.stream.Collectors
-
 plugins { id 'org.apache.beam.module' }
 applyJavaNature(
         automaticModuleName: 'org.apache.beam.sdk.io.kafka.upgrade',
diff --git 
a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
 
b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
index 16580cd219b..eedd2282b1f 100644
--- 
a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
+++ 
b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
@@ -22,6 +22,7 @@ import static 
org.apache.beam.runners.core.construction.TransformUpgrader.toByte
 
 import com.google.auto.service.AutoService;
 import java.io.IOException;
+import java.io.InvalidClassException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -48,6 +49,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration;
 import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -203,162 +205,183 @@ public class KafkaIOTranslation {
       if (transform.getCheckStopReadingFn() != null) {
         fieldValues.put("check_stop_reading_fn", 
toByteArray(transform.getCheckStopReadingFn()));
       }
+      if (transform.getBadRecordErrorHandler() != null) {
+        throw new RuntimeException(
+            "Upgrading KafkaIO read transforms that have 
`withBadRecordErrorHandler` property set"
+                + "is not supported yet.");
+      }
 
       return Row.withSchema(schema).withFieldValues(fieldValues).build();
     }
 
     @Override
     public Read<?, ?> fromConfigRow(Row configRow) {
-      Read<?, ?> transform = KafkaIO.read();
-
-      Map<String, byte[]> consumerConfig = configRow.getMap("consumer_config");
-      if (consumerConfig != null) {
-        Map<String, Object> updatedConsumerConfig = new HashMap<>();
-        consumerConfig.forEach(
-            (key, dataBytes) -> {
-              // Adding all allowed properties.
-              if 
(!KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.containsKey(key)) {
-                if (consumerConfig.get(key) == null) {
-                  throw new IllegalArgumentException(
-                      "Encoded value of the consumer config property " + key + 
" was null");
+      try {
+        Read<?, ?> transform = KafkaIO.read();
+
+        Map<String, byte[]> consumerConfig = 
configRow.getMap("consumer_config");
+        if (consumerConfig != null) {
+          Map<String, Object> updatedConsumerConfig = new HashMap<>();
+          consumerConfig.forEach(
+              (key, dataBytes) -> {
+                // Adding all allowed properties.
+                if 
(!KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.containsKey(key)) {
+                  if (consumerConfig.get(key) == null) {
+                    throw new IllegalArgumentException(
+                        "Encoded value of the consumer config property " + key 
+ " was null");
+                  }
+                  try {
+                    updatedConsumerConfig.put(key, 
fromByteArray(consumerConfig.get(key)));
+                  } catch (InvalidClassException e) {
+                    throw new RuntimeException(e);
+                  }
                 }
-                updatedConsumerConfig.put(key, 
fromByteArray(consumerConfig.get(key)));
-              }
-            });
-        transform = transform.withConsumerConfigUpdates(updatedConsumerConfig);
-      }
-      Collection<String> topics = configRow.getArray("topics");
-      if (topics != null) {
-        transform = transform.withTopics(new ArrayList<>(topics));
-      }
-      Collection<Row> topicPartitionRows = 
configRow.getArray("topic_partitions");
-      if (topicPartitionRows != null) {
-        Collection<TopicPartition> topicPartitions =
-            topicPartitionRows.stream()
-                .map(
-                    row -> {
-                      String topic = row.getString("topic");
-                      if (topic == null) {
-                        throw new IllegalArgumentException("Expected the topic 
to be not null");
-                      }
-                      Integer partition = row.getInt32("partition");
-                      if (partition == null) {
-                        throw new IllegalArgumentException("Expected the 
partition to be not null");
-                      }
-                      return new TopicPartition(topic, partition);
-                    })
-                .collect(Collectors.toList());
-        transform = 
transform.withTopicPartitions(Lists.newArrayList(topicPartitions));
-      }
-      String topicPattern = configRow.getString("topic_pattern");
-      if (topicPattern != null) {
-        transform = transform.withTopicPattern(topicPattern);
-      }
+              });
+          transform = 
transform.withConsumerConfigUpdates(updatedConsumerConfig);
+        }
+        Collection<String> topics = configRow.getArray("topics");
+        if (topics != null) {
+          transform = transform.withTopics(new ArrayList<>(topics));
+        }
+        Collection<Row> topicPartitionRows = 
configRow.getArray("topic_partitions");
+        if (topicPartitionRows != null) {
+          Collection<TopicPartition> topicPartitions =
+              topicPartitionRows.stream()
+                  .map(
+                      row -> {
+                        String topic = row.getString("topic");
+                        if (topic == null) {
+                          throw new IllegalArgumentException("Expected the 
topic to be not null");
+                        }
+                        Integer partition = row.getInt32("partition");
+                        if (partition == null) {
+                          throw new IllegalArgumentException(
+                              "Expected the partition to be not null");
+                        }
+                        return new TopicPartition(topic, partition);
+                      })
+                  .collect(Collectors.toList());
+          transform = 
transform.withTopicPartitions(Lists.newArrayList(topicPartitions));
+        }
+        String topicPattern = configRow.getString("topic_pattern");
+        if (topicPattern != null) {
+          transform = transform.withTopicPattern(topicPattern);
+        }
 
-      byte[] keyDeserializerProvider = 
configRow.getBytes("key_deserializer_provider");
-      if (keyDeserializerProvider != null) {
+        byte[] keyDeserializerProvider = 
configRow.getBytes("key_deserializer_provider");
+        if (keyDeserializerProvider != null) {
+
+          byte[] keyCoder = configRow.getBytes("key_coder");
+          if (keyCoder != null) {
+            transform =
+                transform.withKeyDeserializerProviderAndCoder(
+                    (DeserializerProvider) 
fromByteArray(keyDeserializerProvider),
+                    (org.apache.beam.sdk.coders.Coder) 
fromByteArray(keyCoder));
+          } else {
+            transform =
+                transform.withKeyDeserializer(
+                    (DeserializerProvider) 
fromByteArray(keyDeserializerProvider));
+          }
+        }
 
-        byte[] keyCoder = configRow.getBytes("key_coder");
-        if (keyCoder != null) {
-          transform =
-              transform.withKeyDeserializerProviderAndCoder(
-                  (DeserializerProvider) 
fromByteArray(keyDeserializerProvider),
-                  (org.apache.beam.sdk.coders.Coder) fromByteArray(keyCoder));
-        } else {
-          transform =
-              transform.withKeyDeserializer(
-                  (DeserializerProvider) 
fromByteArray(keyDeserializerProvider));
+        byte[] valueDeserializerProvider = 
configRow.getBytes("value_deserializer_provider");
+        if (valueDeserializerProvider != null) {
+          byte[] valueCoder = configRow.getBytes("value_coder");
+          if (valueCoder != null) {
+            transform =
+                transform.withValueDeserializerProviderAndCoder(
+                    (DeserializerProvider) 
fromByteArray(valueDeserializerProvider),
+                    (org.apache.beam.sdk.coders.Coder) 
fromByteArray(valueCoder));
+          } else {
+            transform =
+                transform.withValueDeserializer(
+                    (DeserializerProvider) 
fromByteArray(valueDeserializerProvider));
+          }
         }
-      }
 
-      byte[] valueDeserializerProvider = 
configRow.getBytes("value_deserializer_provider");
-      if (valueDeserializerProvider != null) {
-        byte[] valueCoder = configRow.getBytes("value_coder");
-        if (valueCoder != null) {
+        byte[] consumerFactoryFn = configRow.getBytes("consumer_factory_fn");
+        if (consumerFactoryFn != null) {
+          transform =
+              transform.withConsumerFactoryFn(
+                  (SerializableFunction<Map<String, Object>, Consumer<byte[], 
byte[]>>)
+                      fromByteArray(consumerFactoryFn));
+        }
+        byte[] watermarkFn = configRow.getBytes("watermark_fn");
+        if (watermarkFn != null) {
+          transform = transform.withWatermarkFn2((SerializableFunction) 
fromByteArray(watermarkFn));
+        }
+        Long maxNumRecords = configRow.getInt64("max_num_records");
+        if (maxNumRecords != null) {
+          transform = transform.withMaxNumRecords(maxNumRecords);
+        }
+        Duration maxReadTime = configRow.getValue("max_read_time");
+        if (maxReadTime != null) {
           transform =
-              transform.withValueDeserializerProviderAndCoder(
-                  (DeserializerProvider) 
fromByteArray(valueDeserializerProvider),
-                  (org.apache.beam.sdk.coders.Coder) 
fromByteArray(valueCoder));
-        } else {
+              
transform.withMaxReadTime(org.joda.time.Duration.millis(maxReadTime.toMillis()));
+        }
+        Instant startReadTime = configRow.getValue("start_read_time");
+        if (startReadTime != null) {
+          transform = transform.withStartReadTime(startReadTime);
+        }
+        Instant stopReadTime = configRow.getValue("stop_read_time");
+        if (stopReadTime != null) {
+          transform = transform.withStopReadTime(stopReadTime);
+        }
+        Boolean isCommitOffsetFinalizeEnabled =
+            configRow.getBoolean("is_commit_offset_finalize_enabled");
+        if (isCommitOffsetFinalizeEnabled != null && 
isCommitOffsetFinalizeEnabled) {
+          transform = transform.commitOffsetsInFinalize();
+        }
+        Boolean isDynamicRead = configRow.getBoolean("is_dynamic_read");
+        if (isDynamicRead != null && isDynamicRead) {
+          Duration watchTopicPartitionDuration =
+              configRow.getValue("watch_topic_partition_duration");
+          if (watchTopicPartitionDuration == null) {
+            throw new IllegalArgumentException(
+                "Expected watchTopicPartitionDuration to be available when 
isDynamicRead is set to true");
+          }
           transform =
-              transform.withValueDeserializer(
-                  (DeserializerProvider) 
fromByteArray(valueDeserializerProvider));
+              transform.withDynamicRead(
+                  
org.joda.time.Duration.millis(watchTopicPartitionDuration.toMillis()));
         }
-      }
 
-      byte[] consumerFactoryFn = configRow.getBytes("consumer_factory_fn");
-      if (consumerFactoryFn != null) {
-        transform =
-            transform.withConsumerFactoryFn(
-                (SerializableFunction<Map<String, Object>, Consumer<byte[], 
byte[]>>)
-                    fromByteArray(consumerFactoryFn));
-      }
-      byte[] watermarkFn = configRow.getBytes("watermark_fn");
-      if (watermarkFn != null) {
-        transform = transform.withWatermarkFn2((SerializableFunction) 
fromByteArray(watermarkFn));
-      }
-      Long maxNumRecords = configRow.getInt64("max_num_records");
-      if (maxNumRecords != null) {
-        transform = transform.withMaxNumRecords(maxNumRecords);
-      }
-      Duration maxReadTime = configRow.getValue("max_read_time");
-      if (maxReadTime != null) {
-        transform =
-            
transform.withMaxReadTime(org.joda.time.Duration.millis(maxReadTime.toMillis()));
-      }
-      Instant startReadTime = configRow.getValue("start_read_time");
-      if (startReadTime != null) {
-        transform = transform.withStartReadTime(startReadTime);
-      }
-      Instant stopReadTime = configRow.getValue("stop_read_time");
-      if (stopReadTime != null) {
-        transform = transform.withStopReadTime(stopReadTime);
-      }
-      Boolean isCommitOffsetFinalizeEnabled =
-          configRow.getBoolean("is_commit_offset_finalize_enabled");
-      if (isCommitOffsetFinalizeEnabled != null && 
isCommitOffsetFinalizeEnabled) {
-        transform = transform.commitOffsetsInFinalize();
-      }
-      Boolean isDynamicRead = configRow.getBoolean("is_dynamic_read");
-      if (isDynamicRead != null && isDynamicRead) {
-        Duration watchTopicPartitionDuration = 
configRow.getValue("watch_topic_partition_duration");
-        if (watchTopicPartitionDuration == null) {
-          throw new IllegalArgumentException(
-              "Expected watchTopicPartitionDuration to be available when 
isDynamicRead is set to true");
+        byte[] timestampPolicyFactory = 
configRow.getBytes("timestamp_policy_factory");
+        if (timestampPolicyFactory != null) {
+          transform =
+              transform.withTimestampPolicyFactory(
+                  (TimestampPolicyFactory) 
fromByteArray(timestampPolicyFactory));
+        }
+        Map<String, byte[]> offsetConsumerConfig = 
configRow.getMap("offset_consumer_config");
+        if (offsetConsumerConfig != null) {
+          Map<String, Object> updatedOffsetConsumerConfig = new HashMap<>();
+          offsetConsumerConfig.forEach(
+              (key, dataBytes) -> {
+                if (offsetConsumerConfig.get(key) == null) {
+                  throw new IllegalArgumentException(
+                      "Encoded value for the offset consumer config key " + 
key + " was null.");
+                }
+                try {
+                  updatedOffsetConsumerConfig.put(
+                      key, fromByteArray(offsetConsumerConfig.get(key)));
+                } catch (InvalidClassException e) {
+                  throw new RuntimeException(e);
+                }
+              });
+          transform = 
transform.withOffsetConsumerConfigOverrides(updatedOffsetConsumerConfig);
         }
-        transform =
-            transform.withDynamicRead(
-                
org.joda.time.Duration.millis(watchTopicPartitionDuration.toMillis()));
-      }
 
-      byte[] timestampPolicyFactory = 
configRow.getBytes("timestamp_policy_factory");
-      if (timestampPolicyFactory != null) {
-        transform =
-            transform.withTimestampPolicyFactory(
-                (TimestampPolicyFactory) 
fromByteArray(timestampPolicyFactory));
-      }
-      Map<String, byte[]> offsetConsumerConfig = 
configRow.getMap("offset_consumer_config");
-      if (offsetConsumerConfig != null) {
-        Map<String, Object> updatedOffsetConsumerConfig = new HashMap<>();
-        offsetConsumerConfig.forEach(
-            (key, dataBytes) -> {
-              if (offsetConsumerConfig.get(key) == null) {
-                throw new IllegalArgumentException(
-                    "Encoded value for the offset consumer config key " + key 
+ " was null.");
-              }
-              updatedOffsetConsumerConfig.put(key, 
fromByteArray(offsetConsumerConfig.get(key)));
-            });
-        transform = 
transform.withOffsetConsumerConfigOverrides(updatedOffsetConsumerConfig);
-      }
+        byte[] checkStopReadinfFn = 
configRow.getBytes("check_stop_reading_fn");
+        if (checkStopReadinfFn != null) {
+          transform =
+              transform.withCheckStopReadingFn(
+                  (SerializableFunction<TopicPartition, Boolean>)
+                      fromByteArray(checkStopReadinfFn));
+        }
 
-      byte[] checkStopReadinfFn = configRow.getBytes("check_stop_reading_fn");
-      if (checkStopReadinfFn != null) {
-        transform =
-            transform.withCheckStopReadingFn(
-                (SerializableFunction<TopicPartition, Boolean>) 
fromByteArray(checkStopReadinfFn));
+        return transform;
+      } catch (InvalidClassException e) {
+        throw new RuntimeException(e);
       }
-
-      return transform;
     }
   }
 
@@ -476,68 +499,83 @@ public class KafkaIOTranslation {
                 });
         fieldValues.put("producer_config", producerConfigMap);
       }
+      if (writeRecordsTransform.getBadRecordErrorHandler() != null
+          && !(writeRecordsTransform.getBadRecordErrorHandler()
+              instanceof ErrorHandler.DefaultErrorHandler)) {
+        throw new RuntimeException(
+            "Upgrading KafkaIO write transforms that have 
`withBadRecordErrorHandler` property set"
+                + "is not supported yet.");
+      }
 
       return Row.withSchema(schema).withFieldValues(fieldValues).build();
     }
 
     @Override
     public Write<?, ?> fromConfigRow(Row configRow) {
-      Write<?, ?> transform = KafkaIO.write();
+      try {
+        Write<?, ?> transform = KafkaIO.write();
 
-      String bootstrapServers = configRow.getString("bootstrap_servers");
-      if (bootstrapServers != null) {
-        transform = transform.withBootstrapServers(bootstrapServers);
-      }
-      String topic = configRow.getValue("topic");
-      if (topic != null) {
-        transform = transform.withTopic(topic);
-      }
-      byte[] keySerializerBytes = configRow.getBytes("key_serializer");
-      if (keySerializerBytes != null) {
-        transform = transform.withKeySerializer((Class) 
fromByteArray(keySerializerBytes));
-      }
-      byte[] valueSerializerBytes = configRow.getBytes("value_serializer");
-      if (valueSerializerBytes != null) {
-        transform = transform.withValueSerializer((Class) 
fromByteArray(valueSerializerBytes));
-      }
-      byte[] producerFactoryFnBytes = 
configRow.getBytes("producer_factory_fn");
-      if (producerFactoryFnBytes != null) {
-        transform =
-            transform.withProducerFactoryFn(
-                (SerializableFunction) fromByteArray(producerFactoryFnBytes));
-      }
-      Boolean isEOS = configRow.getBoolean("eos");
-      if (isEOS != null && isEOS) {
-        Integer numShards = configRow.getInt32("num_shards");
-        String sinkGroupId = configRow.getString("sink_group_id");
-        if (numShards == null) {
-          throw new IllegalArgumentException(
-              "Expected numShards to be provided when EOS is set to true");
+        String bootstrapServers = configRow.getString("bootstrap_servers");
+        if (bootstrapServers != null) {
+          transform = transform.withBootstrapServers(bootstrapServers);
         }
-        if (sinkGroupId == null) {
-          throw new IllegalArgumentException(
-              "Expected sinkGroupId to be provided when EOS is set to true");
+        String topic = configRow.getValue("topic");
+        if (topic != null) {
+          transform = transform.withTopic(topic);
+        }
+        byte[] keySerializerBytes = configRow.getBytes("key_serializer");
+        if (keySerializerBytes != null) {
+          transform = transform.withKeySerializer((Class) 
fromByteArray(keySerializerBytes));
+        }
+        byte[] valueSerializerBytes = configRow.getBytes("value_serializer");
+        if (valueSerializerBytes != null) {
+          transform = transform.withValueSerializer((Class) 
fromByteArray(valueSerializerBytes));
+        }
+        byte[] producerFactoryFnBytes = 
configRow.getBytes("producer_factory_fn");
+        if (producerFactoryFnBytes != null) {
+          transform =
+              transform.withProducerFactoryFn(
+                  (SerializableFunction) 
fromByteArray(producerFactoryFnBytes));
+        }
+        Boolean isEOS = configRow.getBoolean("eos");
+        if (isEOS != null && isEOS) {
+          Integer numShards = configRow.getInt32("num_shards");
+          String sinkGroupId = configRow.getString("sink_group_id");
+          if (numShards == null) {
+            throw new IllegalArgumentException(
+                "Expected numShards to be provided when EOS is set to true");
+          }
+          if (sinkGroupId == null) {
+            throw new IllegalArgumentException(
+                "Expected sinkGroupId to be provided when EOS is set to true");
+          }
+          transform = transform.withEOS(numShards, sinkGroupId);
+        }
+        byte[] consumerFactoryFnBytes = 
configRow.getBytes("consumer_factory_fn");
+        if (consumerFactoryFnBytes != null) {
+          transform =
+              transform.withConsumerFactoryFn(
+                  (SerializableFunction) 
fromByteArray(consumerFactoryFnBytes));
         }
-        transform = transform.withEOS(numShards, sinkGroupId);
-      }
-      byte[] consumerFactoryFnBytes = 
configRow.getBytes("consumer_factory_fn");
-      if (consumerFactoryFnBytes != null) {
-        transform =
-            transform.withConsumerFactoryFn(
-                (SerializableFunction) fromByteArray(consumerFactoryFnBytes));
-      }
 
-      Map<String, byte[]> producerConfig = configRow.getMap("producer_config");
-      if (producerConfig != null && !producerConfig.isEmpty()) {
-        Map<String, Object> updatedProducerConfig = new HashMap<>();
-        producerConfig.forEach(
-            (key, dataBytes) -> {
-              updatedProducerConfig.put(key, fromByteArray((byte[]) 
dataBytes));
-            });
-        transform = transform.withProducerConfigUpdates(updatedProducerConfig);
-      }
+        Map<String, byte[]> producerConfig = 
configRow.getMap("producer_config");
+        if (producerConfig != null && !producerConfig.isEmpty()) {
+          Map<String, Object> updatedProducerConfig = new HashMap<>();
+          producerConfig.forEach(
+              (key, dataBytes) -> {
+                try {
+                  updatedProducerConfig.put(key, fromByteArray((byte[]) 
dataBytes));
+                } catch (InvalidClassException e) {
+                  throw new RuntimeException(e);
+                }
+              });
+          transform = 
transform.withProducerConfigUpdates(updatedProducerConfig);
+        }
 
-      return transform;
+        return transform;
+      } catch (InvalidClassException e) {
+        throw new RuntimeException(e);
+      }
     }
   }
 
diff --git 
a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
 
b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
index d99bee0ad20..be54d7830d5 100644
--- 
a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
+++ 
b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.io.kafka.KafkaIO.WriteRecords;
 import 
org.apache.beam.sdk.io.kafka.upgrade.KafkaIOTranslation.KafkaIOReadWithMetadataTranslator;
 import 
org.apache.beam.sdk.io.kafka.upgrade.KafkaIOTranslation.KafkaIOWriteTranslator;
 import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
@@ -123,6 +124,9 @@ public class KafkaIOTranslationTest {
 
   @Test
   public void testReadTransformRowIncludesAllFields() throws Exception {
+    // TODO: support 'withBadRecordErrorHandler' property.
+    List<String> fieldsToIgnore =
+        ImmutableList.of("getBadRecordRouter", "getBadRecordErrorHandler");
     List<String> getMethodNames =
         Arrays.stream(Read.class.getDeclaredMethods())
             .map(
@@ -130,6 +134,7 @@ public class KafkaIOTranslationTest {
                   return method.getName();
                 })
             .filter(methodName -> methodName.startsWith("get"))
+            .filter(methodName -> !fieldsToIgnore.contains(methodName))
             .collect(Collectors.toList());
 
     // Just to make sure that this does not pass trivially.
@@ -189,6 +194,11 @@ public class KafkaIOTranslationTest {
 
   @Test
   public void testWriteTransformRowIncludesAllFields() throws Exception {
+    // For these fields, default value will suffice (so no need to serialize 
when upgrading).
+    // TODO: support 'withBadRecordErrorHandler' property.
+    List<String> fieldsToIgnore =
+        ImmutableList.of("getBadRecordRouter", "getBadRecordErrorHandler");
+
     // Write transform delegates property handling to the WriteRecords class. 
So we inspect the
     // WriteRecords class here.
     List<String> getMethodNames =
@@ -198,6 +208,7 @@ public class KafkaIOTranslationTest {
                   return method.getName();
                 })
             .filter(methodName -> methodName.startsWith("get"))
+            .filter(methodName -> !fieldsToIgnore.contains(methodName))
             .collect(Collectors.toList());
 
     // Just to make sure that this does not pass trivially.

Reply via email to