This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4264c2c3e25 Fixes breakages of the upgrade feature (#29731)
4264c2c3e25 is described below
commit 4264c2c3e2586eb0a6175d71093509408194de06
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Wed Dec 13 20:11:29 2023 -0800
Fixes breakages of the upgrade feature (#29731)
* Fixes breakages of the upgrade feature
* Fix spotless
* Addressing reviewer comments
* Removing unused import
* Reverting the PreCommit update
---
.../core/construction/TransformUpgrader.java | 13 +-
.../sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 705 +++++++++++----------
sdks/java/io/kafka/upgrade/build.gradle | 2 -
.../sdk/io/kafka/upgrade/KafkaIOTranslation.java | 414 ++++++------
.../io/kafka/upgrade/KafkaIOTranslationTest.java | 11 +
5 files changed, 624 insertions(+), 521 deletions(-)
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java
index e6dce752d06..b142ab4af1c 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InvalidClassException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
@@ -49,12 +50,16 @@ import
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannelBuilder;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A utility class that allows upgrading transforms of a given pipeline using
the Beam Transform
* Service.
*/
public class TransformUpgrader implements AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TransformUpgrader.class);
private static final String UPGRADE_NAMESPACE = "transform:upgrade:";
private ExpansionServiceClientFactory clientFactory;
@@ -405,10 +410,16 @@ public class TransformUpgrader implements AutoCloseable {
* method.
* @return re-generated object.
*/
- public static Object fromByteArray(byte[] bytes) {
+ public static Object fromByteArray(byte[] bytes) throws
InvalidClassException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream in = new ObjectInputStream(bis)) {
return in.readObject();
+ } catch (InvalidClassException e) {
+ LOG.info(
+ "An object cannot be re-generated from the provided byte array.
Caller may use the "
+ + "default value for the parameter when upgrading. Underlying
error: "
+ + e);
+ throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
index a0c3bdbece5..1056328eb4c 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
@@ -26,6 +26,7 @@ import com.google.auto.service.AutoService;
import
com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import java.io.IOException;
+import java.io.InvalidClassException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
@@ -63,10 +64,14 @@ import
org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@SuppressWarnings({"rawtypes", "nullness"})
public class BigQueryIOTranslation {
+ private static final Logger LOG =
LoggerFactory.getLogger(BigQueryIOTranslation.class);
+
static class BigQueryIOReadTranslator implements
TransformPayloadTranslator<TypedRead<?>> {
static Schema schema =
@@ -184,105 +189,124 @@ public class BigQueryIOTranslation {
@Override
public TypedRead<?> fromConfigRow(Row configRow) {
- BigQueryIO.TypedRead.Builder builder = new
AutoValue_BigQueryIO_TypedRead.Builder<>();
+ try {
+ BigQueryIO.TypedRead.Builder builder = new
AutoValue_BigQueryIO_TypedRead.Builder<>();
- String jsonTableRef = configRow.getString("json_table_ref");
- if (jsonTableRef != null) {
- builder =
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
- }
- String query = configRow.getString("query");
- if (query != null) {
- builder = builder.setQuery(StaticValueProvider.of(query));
- }
- Boolean validate = configRow.getBoolean("validate");
- if (validate != null) {
- builder = builder.setValidate(validate);
- }
- Boolean flattenResults = configRow.getBoolean("flatten_results");
- if (flattenResults != null) {
- builder = builder.setFlattenResults(flattenResults);
- }
- Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
- if (useLegacySQL != null) {
- builder = builder.setUseLegacySql(useLegacySQL);
- }
- Boolean withTemplateCompatibility =
configRow.getBoolean("with_template_compatibility");
- if (withTemplateCompatibility != null) {
- builder =
builder.setWithTemplateCompatibility(withTemplateCompatibility);
- }
- byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
- if (bigqueryServicesBytes != null) {
- builder =
- builder.setBigQueryServices((BigQueryServices)
fromByteArray(bigqueryServicesBytes));
- }
- byte[] parseFnBytes = configRow.getBytes("parse_fn");
- if (parseFnBytes != null) {
- builder = builder.setParseFn((SerializableFunction)
fromByteArray(parseFnBytes));
- }
- byte[] datumReaderFactoryBytes =
configRow.getBytes("datum_reader_factory");
- if (datumReaderFactoryBytes != null) {
- builder =
- builder.setDatumReaderFactory(
- (SerializableFunction) fromByteArray(datumReaderFactoryBytes));
- }
- byte[] queryPriorityBytes = configRow.getBytes("query_priority");
- if (queryPriorityBytes != null) {
- builder = builder.setQueryPriority((QueryPriority)
fromByteArray(queryPriorityBytes));
- }
- String queryLocation = configRow.getString("query_location");
- if (queryLocation != null) {
- builder = builder.setQueryLocation(queryLocation);
- }
- String queryTempDataset = configRow.getString("query_temp_dataset");
- if (queryTempDataset != null) {
- builder = builder.setQueryTempDataset(queryTempDataset);
- }
- byte[] methodBytes = configRow.getBytes("method");
- if (methodBytes != null) {
- builder = builder.setMethod((TypedRead.Method)
fromByteArray(methodBytes));
- }
- byte[] formatBytes = configRow.getBytes("format");
- if (methodBytes != null) {
- builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
- }
- Collection<String> selectedFields =
configRow.getArray("selected_fields");
- if (selectedFields != null && !selectedFields.isEmpty()) {
-
builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));
- }
- String rowRestriction = configRow.getString("row_restriction");
- if (rowRestriction != null) {
- builder =
builder.setRowRestriction(StaticValueProvider.of(rowRestriction));
- }
- byte[] coderBytes = configRow.getBytes("coder");
- if (coderBytes != null) {
- builder = builder.setCoder((Coder) fromByteArray(coderBytes));
- }
- String kmsKey = configRow.getString("kms_key");
- if (kmsKey != null) {
- builder = builder.setKmsKey(kmsKey);
- }
- byte[] typeDescriptorBytes = configRow.getBytes("type_descriptor");
- if (typeDescriptorBytes != null) {
- builder = builder.setTypeDescriptor((TypeDescriptor)
fromByteArray(typeDescriptorBytes));
- }
- byte[] toBeamRowFnBytes = configRow.getBytes("to_beam_row_fn");
- if (toBeamRowFnBytes != null) {
- builder = builder.setToBeamRowFn((ToBeamRowFunction)
fromByteArray(toBeamRowFnBytes));
- }
- byte[] fromBeamRowFnBytes = configRow.getBytes("from_beam_row_fn");
- if (fromBeamRowFnBytes != null) {
- builder = builder.setFromBeamRowFn((FromBeamRowFunction)
fromByteArray(fromBeamRowFnBytes));
- }
- Boolean useAvroLogicalTypes =
configRow.getBoolean("use_avro_logical_types");
- if (useAvroLogicalTypes != null) {
- builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
- }
- Boolean projectionPushdownApplied =
configRow.getBoolean("projection_pushdown_applied");
- if (projectionPushdownApplied != null) {
- builder =
builder.setProjectionPushdownApplied(projectionPushdownApplied);
- }
+ String jsonTableRef = configRow.getString("json_table_ref");
+ if (jsonTableRef != null) {
+ builder =
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
+ }
+ String query = configRow.getString("query");
+ if (query != null) {
+ builder = builder.setQuery(StaticValueProvider.of(query));
+ }
+ Boolean validate = configRow.getBoolean("validate");
+ if (validate != null) {
+ builder = builder.setValidate(validate);
+ }
+ Boolean flattenResults = configRow.getBoolean("flatten_results");
+ if (flattenResults != null) {
+ builder = builder.setFlattenResults(flattenResults);
+ }
+ Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
+ if (useLegacySQL != null) {
+ builder = builder.setUseLegacySql(useLegacySQL);
+ }
+ Boolean withTemplateCompatibility =
configRow.getBoolean("with_template_compatibility");
+ if (withTemplateCompatibility != null) {
+ builder =
builder.setWithTemplateCompatibility(withTemplateCompatibility);
+ }
+ byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
+ if (bigqueryServicesBytes != null) {
+ try {
+ builder =
+ builder.setBigQueryServices(
+ (BigQueryServices) fromByteArray(bigqueryServicesBytes));
+ } catch (InvalidClassException e) {
+ LOG.warn(
+ "Could not use the provided `BigQueryServices` implementation
when upgrading."
+ + "Using the default.");
+ builder.setBigQueryServices(new BigQueryServicesImpl());
+ }
+ }
+ byte[] parseFnBytes = configRow.getBytes("parse_fn");
+ if (parseFnBytes != null) {
+ builder = builder.setParseFn((SerializableFunction)
fromByteArray(parseFnBytes));
+ }
+ byte[] datumReaderFactoryBytes =
configRow.getBytes("datum_reader_factory");
+ if (datumReaderFactoryBytes != null) {
+ builder =
+ builder.setDatumReaderFactory(
+ (SerializableFunction)
fromByteArray(datumReaderFactoryBytes));
+ }
+ byte[] queryPriorityBytes = configRow.getBytes("query_priority");
+ if (queryPriorityBytes != null) {
+ builder = builder.setQueryPriority((QueryPriority)
fromByteArray(queryPriorityBytes));
+ }
+ String queryLocation = configRow.getString("query_location");
+ if (queryLocation != null) {
+ builder = builder.setQueryLocation(queryLocation);
+ }
+ String queryTempDataset = configRow.getString("query_temp_dataset");
+ if (queryTempDataset != null) {
+ builder = builder.setQueryTempDataset(queryTempDataset);
+ }
+ byte[] methodBytes = configRow.getBytes("method");
+ if (methodBytes != null) {
+ builder = builder.setMethod((TypedRead.Method)
fromByteArray(methodBytes));
+ }
+ byte[] formatBytes = configRow.getBytes("format");
+ if (methodBytes != null) {
+ builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
+ }
+ Collection<String> selectedFields =
configRow.getArray("selected_fields");
+ if (selectedFields != null && !selectedFields.isEmpty()) {
+
builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));
+ }
+ String rowRestriction = configRow.getString("row_restriction");
+ if (rowRestriction != null) {
+ builder =
builder.setRowRestriction(StaticValueProvider.of(rowRestriction));
+ }
+ byte[] coderBytes = configRow.getBytes("coder");
+ if (coderBytes != null) {
+ try {
+ builder = builder.setCoder((Coder) fromByteArray(coderBytes));
+ } catch (InvalidClassException e) {
+ LOG.warn(
+ "Could not use the provided `Coder` implementation when
upgrading."
+ + "Using the default.");
+ }
+ }
+ String kmsKey = configRow.getString("kms_key");
+ if (kmsKey != null) {
+ builder = builder.setKmsKey(kmsKey);
+ }
+ byte[] typeDescriptorBytes = configRow.getBytes("type_descriptor");
+ if (typeDescriptorBytes != null) {
+ builder = builder.setTypeDescriptor((TypeDescriptor)
fromByteArray(typeDescriptorBytes));
+ }
+ byte[] toBeamRowFnBytes = configRow.getBytes("to_beam_row_fn");
+ if (toBeamRowFnBytes != null) {
+ builder = builder.setToBeamRowFn((ToBeamRowFunction)
fromByteArray(toBeamRowFnBytes));
+ }
+ byte[] fromBeamRowFnBytes = configRow.getBytes("from_beam_row_fn");
+ if (fromBeamRowFnBytes != null) {
+ builder =
+ builder.setFromBeamRowFn((FromBeamRowFunction)
fromByteArray(fromBeamRowFnBytes));
+ }
+ Boolean useAvroLogicalTypes =
configRow.getBoolean("use_avro_logical_types");
+ if (useAvroLogicalTypes != null) {
+ builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
+ }
+ Boolean projectionPushdownApplied =
configRow.getBoolean("projection_pushdown_applied");
+ if (projectionPushdownApplied != null) {
+ builder =
builder.setProjectionPushdownApplied(projectionPushdownApplied);
+ }
- return builder.build();
+ return builder.build();
+ } catch (InvalidClassException e) {
+ throw new RuntimeException(e);
+ }
}
}
@@ -529,239 +553,260 @@ public class BigQueryIOTranslation {
@Override
public Write<?> fromConfigRow(Row configRow) {
- BigQueryIO.Write.Builder builder = new
AutoValue_BigQueryIO_Write.Builder<>();
-
- String jsonTableRef = configRow.getString("json_table_ref");
- if (jsonTableRef != null) {
- builder =
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
- }
- byte[] tableFunctionBytes = configRow.getBytes("table_function");
- if (tableFunctionBytes != null) {
- builder =
- builder.setTableFunction(
- (SerializableFunction<ValueInSingleWindow, TableDestination>)
- fromByteArray(tableFunctionBytes));
- }
- byte[] formatFunctionBytes = configRow.getBytes("format_function");
- if (formatFunctionBytes != null) {
- builder =
- builder.setFormatFunction(
- (SerializableFunction<?, TableRow>)
fromByteArray(formatFunctionBytes));
- }
- byte[] formatRecordOnFailureFunctionBytes =
- configRow.getBytes("format_record_on_failure_function");
- if (tableFunctionBytes != null) {
- builder =
- builder.setFormatRecordOnFailureFunction(
- (SerializableFunction<?, TableRow>)
- fromByteArray(formatRecordOnFailureFunctionBytes));
- }
- byte[] avroRowWriterFactoryBytes =
configRow.getBytes("avro_row_writer_factory");
- if (avroRowWriterFactoryBytes != null) {
- builder =
- builder.setAvroRowWriterFactory(
- (AvroRowWriterFactory)
fromByteArray(avroRowWriterFactoryBytes));
- }
- byte[] avroSchemaFactoryBytes =
configRow.getBytes("avro_schema_factory");
- if (tableFunctionBytes != null) {
- builder =
- builder.setAvroSchemaFactory(
- (SerializableFunction) fromByteArray(avroSchemaFactoryBytes));
- }
- Boolean useAvroLogicalTypes =
configRow.getBoolean("use_avro_logical_types");
- if (useAvroLogicalTypes != null) {
- builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
- }
- byte[] dynamicDestinationsBytes =
configRow.getBytes("dynamic_destinations");
- if (dynamicDestinationsBytes != null) {
- builder =
- builder.setDynamicDestinations(
- (DynamicDestinations) fromByteArray(dynamicDestinationsBytes));
- }
- String jsonSchema = configRow.getString("json_schema");
- if (jsonSchema != null) {
- builder = builder.setJsonSchema(StaticValueProvider.of(jsonSchema));
- }
- String jsonTimePartitioning =
configRow.getString("json_time_partitioning");
- if (jsonTimePartitioning != null) {
- builder =
builder.setJsonTimePartitioning(StaticValueProvider.of(jsonTimePartitioning));
- }
- byte[] clusteringBytes = configRow.getBytes("clustering");
- if (clusteringBytes != null) {
- builder = builder.setClustering((Clustering)
fromByteArray(clusteringBytes));
- }
- byte[] createDispositionBytes = configRow.getBytes("create_disposition");
- if (createDispositionBytes != null) {
- builder =
- builder.setCreateDisposition((CreateDisposition)
fromByteArray(createDispositionBytes));
- }
- byte[] writeDispositionBytes = configRow.getBytes("write_disposition");
- if (writeDispositionBytes != null) {
- builder =
- builder.setWriteDisposition((WriteDisposition)
fromByteArray(writeDispositionBytes));
- }
- Collection<byte[]> schemaUpdateOptionsData =
configRow.getArray("schema_update_options");
- if (schemaUpdateOptionsData != null) {
- Set<SchemaUpdateOption> schemaUpdateOptions =
- schemaUpdateOptionsData.stream()
- .map(data -> (SchemaUpdateOption) fromByteArray(data))
- .collect(Collectors.toSet());
- builder = builder.setSchemaUpdateOptions(schemaUpdateOptions);
- } else {
- // This property is not nullable.
- builder = builder.setSchemaUpdateOptions(Collections.emptySet());
- }
- String tableDescription = configRow.getString("table_description");
- if (tableDescription != null) {
- builder = builder.setTableDescription(tableDescription);
- }
- Boolean validate = configRow.getBoolean("validate");
- if (validate != null) {
- builder = builder.setValidate(validate);
- }
- byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
- if (bigqueryServicesBytes != null) {
- builder =
- builder.setBigQueryServices((BigQueryServices)
fromByteArray(bigqueryServicesBytes));
- }
- Integer maxFilesPerBundle = configRow.getInt32("max_files_per_bundle");
- if (maxFilesPerBundle != null) {
- builder = builder.setMaxFilesPerBundle(maxFilesPerBundle);
- }
- Long maxFileSize = configRow.getInt64("max_file_size");
- if (maxFileSize != null) {
- builder = builder.setMaxFileSize(maxFileSize);
- }
- Integer numFileShards = configRow.getInt32("num_file_shards");
- if (numFileShards != null) {
- builder = builder.setNumFileShards(numFileShards);
- }
- Integer numStorageWriteApiStreams =
configRow.getInt32("num_storage_write_api_streams");
- if (numStorageWriteApiStreams != null) {
- builder =
builder.setNumStorageWriteApiStreams(numStorageWriteApiStreams);
- }
- Boolean propagateSuccessfulStorageApiWrites =
- configRow.getBoolean("propagate_successful_storage_api_writes");
- if (propagateSuccessfulStorageApiWrites != null) {
- builder =
-
builder.setPropagateSuccessfulStorageApiWrites(propagateSuccessfulStorageApiWrites);
- }
- Integer maxFilesPerPartition =
configRow.getInt32("max_files_per_partition");
- if (maxFilesPerPartition != null) {
- builder = builder.setMaxFilesPerPartition(maxFilesPerPartition);
- }
- Long maxBytesPerPartition =
configRow.getInt64("max_bytes_per_partition");
- if (maxBytesPerPartition != null) {
- builder = builder.setMaxBytesPerPartition(maxBytesPerPartition);
- }
- Duration triggerringFrequency =
configRow.getValue("triggerring_frequency");
- if (triggerringFrequency != null) {
- builder =
- builder.setTriggeringFrequency(
-
org.joda.time.Duration.millis(triggerringFrequency.toMillis()));
- }
- byte[] methodBytes = configRow.getBytes("method");
- if (methodBytes != null) {
- builder = builder.setMethod((Write.Method) fromByteArray(methodBytes));
- }
- String loadJobProjectId = configRow.getString("load_job_project_id");
- if (loadJobProjectId != null) {
- builder =
builder.setLoadJobProjectId(StaticValueProvider.of(loadJobProjectId));
- }
- byte[] failedInsertRetryPolicyBytes =
configRow.getBytes("failed_insert_retry_policy");
- if (failedInsertRetryPolicyBytes != null) {
- builder =
- builder.setFailedInsertRetryPolicy(
- (InsertRetryPolicy)
fromByteArray(failedInsertRetryPolicyBytes));
- }
- String customGcsTempLocations =
configRow.getString("custom_gcs_temp_location");
- if (customGcsTempLocations != null) {
- builder =
builder.setCustomGcsTempLocation(StaticValueProvider.of(customGcsTempLocations));
- }
- Boolean extendedErrorInfo = configRow.getBoolean("extended_error_info");
- if (extendedErrorInfo != null) {
- builder = builder.setExtendedErrorInfo(extendedErrorInfo);
- }
- Boolean skipInvalidRows = configRow.getBoolean("skip_invalid_rows");
- if (skipInvalidRows != null) {
- builder = builder.setSkipInvalidRows(skipInvalidRows);
- }
- Boolean ignoreUnknownValues =
configRow.getBoolean("ignore_unknown_values");
- if (ignoreUnknownValues != null) {
- builder = builder.setIgnoreUnknownValues(ignoreUnknownValues);
- }
- Boolean ignoreInsertIds = configRow.getBoolean("ignore_insert_ids");
- if (ignoreInsertIds != null) {
- builder = builder.setIgnoreInsertIds(ignoreInsertIds);
- }
- Integer maxRetryJobs = configRow.getInt32("max_retry_jobs");
- if (maxRetryJobs != null) {
- builder = builder.setMaxRetryJobs(maxRetryJobs);
- }
- String kmsKey = configRow.getString("kms_key");
- if (kmsKey != null) {
- builder = builder.setKmsKey(kmsKey);
- }
- Collection<String> primaryKey = configRow.getArray("primary_key");
- if (primaryKey != null && !primaryKey.isEmpty()) {
- builder = builder.setPrimaryKey(ImmutableList.of(primaryKey));
- }
- byte[] defaultMissingValueInterpretationsBytes =
- configRow.getBytes("default_missing_value_interpretation");
- if (defaultMissingValueInterpretationsBytes != null) {
- builder =
- builder.setDefaultMissingValueInterpretation(
- (MissingValueInterpretation)
- fromByteArray(defaultMissingValueInterpretationsBytes));
- }
- Boolean optimizeWrites = configRow.getBoolean("optimize_writes");
- if (optimizeWrites != null) {
- builder = builder.setOptimizeWrites(optimizeWrites);
- }
- Boolean useBeamSchema = configRow.getBoolean("use_beam_schema");
- if (useBeamSchema != null) {
- builder = builder.setUseBeamSchema(useBeamSchema);
- }
- Boolean autoSharding = configRow.getBoolean("auto_sharding");
- if (autoSharding != null) {
- builder = builder.setAutoSharding(autoSharding);
- }
- Boolean propagateSuccessful =
configRow.getBoolean("propagate_successful");
- if (propagateSuccessful != null) {
- builder = builder.setPropagateSuccessful(propagateSuccessful);
- }
- Boolean autoSchemaUpdate = configRow.getBoolean("auto_schema_update");
- if (autoSchemaUpdate != null) {
- builder = builder.setAutoSchemaUpdate(autoSchemaUpdate);
- }
- byte[] writeProtosClasses = configRow.getBytes("write_protos_class");
- if (writeProtosClasses != null) {
- builder =
- builder.setWriteProtosClass(
- (Class)
fromByteArray(defaultMissingValueInterpretationsBytes));
- }
- Boolean directWriteProtos = configRow.getBoolean("direct_write_protos");
- if (directWriteProtos != null) {
- builder = builder.setDirectWriteProtos(directWriteProtos);
- }
- byte[] deterministicRecordIdFnBytes =
configRow.getBytes("deterministic_record_id_fn");
- if (deterministicRecordIdFnBytes != null) {
- builder =
- builder.setDeterministicRecordIdFn(
- (SerializableFunction)
fromByteArray(deterministicRecordIdFnBytes));
- }
- String writeTempDataset = configRow.getString("write_temp_dataset");
- if (writeTempDataset != null) {
- builder = builder.setWriteTempDataset(writeTempDataset);
- }
- byte[] rowMutationInformationFnBytes =
configRow.getBytes("row_mutation_information_fn");
- if (rowMutationInformationFnBytes != null) {
- builder =
- builder.setRowMutationInformationFn(
- (SerializableFunction)
fromByteArray(rowMutationInformationFnBytes));
- }
-
- return builder.build();
+ try {
+ BigQueryIO.Write.Builder builder = new
AutoValue_BigQueryIO_Write.Builder<>();
+
+ String jsonTableRef = configRow.getString("json_table_ref");
+ if (jsonTableRef != null) {
+ builder =
builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
+ }
+ byte[] tableFunctionBytes = configRow.getBytes("table_function");
+ if (tableFunctionBytes != null) {
+ builder =
+ builder.setTableFunction(
+ (SerializableFunction<ValueInSingleWindow, TableDestination>)
+ fromByteArray(tableFunctionBytes));
+ }
+ byte[] formatFunctionBytes = configRow.getBytes("format_function");
+ if (formatFunctionBytes != null) {
+ builder =
+ builder.setFormatFunction(
+ (SerializableFunction<?, TableRow>)
fromByteArray(formatFunctionBytes));
+ }
+ byte[] formatRecordOnFailureFunctionBytes =
+ configRow.getBytes("format_record_on_failure_function");
+ if (tableFunctionBytes != null) {
+ builder =
+ builder.setFormatRecordOnFailureFunction(
+ (SerializableFunction<?, TableRow>)
+ fromByteArray(formatRecordOnFailureFunctionBytes));
+ }
+ byte[] avroRowWriterFactoryBytes =
configRow.getBytes("avro_row_writer_factory");
+ if (avroRowWriterFactoryBytes != null) {
+ builder =
+ builder.setAvroRowWriterFactory(
+ (AvroRowWriterFactory)
fromByteArray(avroRowWriterFactoryBytes));
+ }
+ byte[] avroSchemaFactoryBytes =
configRow.getBytes("avro_schema_factory");
+ if (tableFunctionBytes != null) {
+ builder =
+ builder.setAvroSchemaFactory(
+ (SerializableFunction)
fromByteArray(avroSchemaFactoryBytes));
+ }
+ Boolean useAvroLogicalTypes =
configRow.getBoolean("use_avro_logical_types");
+ if (useAvroLogicalTypes != null) {
+ builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
+ }
+ byte[] dynamicDestinationsBytes =
configRow.getBytes("dynamic_destinations");
+ if (dynamicDestinationsBytes != null) {
+ builder =
+ builder.setDynamicDestinations(
+ (DynamicDestinations)
fromByteArray(dynamicDestinationsBytes));
+ }
+ String jsonSchema = configRow.getString("json_schema");
+ if (jsonSchema != null) {
+ builder = builder.setJsonSchema(StaticValueProvider.of(jsonSchema));
+ }
+ String jsonTimePartitioning =
configRow.getString("json_time_partitioning");
+ if (jsonTimePartitioning != null) {
+ builder =
builder.setJsonTimePartitioning(StaticValueProvider.of(jsonTimePartitioning));
+ }
+ byte[] clusteringBytes = configRow.getBytes("clustering");
+ if (clusteringBytes != null) {
+ builder = builder.setClustering((Clustering)
fromByteArray(clusteringBytes));
+ }
+ byte[] createDispositionBytes =
configRow.getBytes("create_disposition");
+ if (createDispositionBytes != null) {
+ builder =
+ builder.setCreateDisposition(
+ (CreateDisposition) fromByteArray(createDispositionBytes));
+ }
+ byte[] writeDispositionBytes = configRow.getBytes("write_disposition");
+ if (writeDispositionBytes != null) {
+ builder =
+ builder.setWriteDisposition((WriteDisposition)
fromByteArray(writeDispositionBytes));
+ }
+ Collection<byte[]> schemaUpdateOptionsData =
configRow.getArray("schema_update_options");
+ if (schemaUpdateOptionsData != null) {
+ Set<SchemaUpdateOption> schemaUpdateOptions =
+ schemaUpdateOptionsData.stream()
+ .map(
+ data -> {
+ try {
+ return (SchemaUpdateOption) fromByteArray(data);
+ } catch (InvalidClassException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toSet());
+ builder = builder.setSchemaUpdateOptions(schemaUpdateOptions);
+ } else {
+ // This property is not nullable.
+ builder = builder.setSchemaUpdateOptions(Collections.emptySet());
+ }
+ String tableDescription = configRow.getString("table_description");
+ if (tableDescription != null) {
+ builder = builder.setTableDescription(tableDescription);
+ }
+ Boolean validate = configRow.getBoolean("validate");
+ if (validate != null) {
+ builder = builder.setValidate(validate);
+ }
+ byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
+ if (bigqueryServicesBytes != null) {
+ try {
+ builder =
+ builder.setBigQueryServices(
+ (BigQueryServices) fromByteArray(bigqueryServicesBytes));
+ } catch (InvalidClassException e) {
+ LOG.warn(
+ "Could not use the provided `BigQueryServices` implementation
when upgrading."
+ + "Using the default.");
+ builder.setBigQueryServices(new BigQueryServicesImpl());
+ }
+ }
+ Integer maxFilesPerBundle = configRow.getInt32("max_files_per_bundle");
+ if (maxFilesPerBundle != null) {
+ builder = builder.setMaxFilesPerBundle(maxFilesPerBundle);
+ }
+ Long maxFileSize = configRow.getInt64("max_file_size");
+ if (maxFileSize != null) {
+ builder = builder.setMaxFileSize(maxFileSize);
+ }
+ Integer numFileShards = configRow.getInt32("num_file_shards");
+ if (numFileShards != null) {
+ builder = builder.setNumFileShards(numFileShards);
+ }
+ Integer numStorageWriteApiStreams =
configRow.getInt32("num_storage_write_api_streams");
+ if (numStorageWriteApiStreams != null) {
+ builder =
builder.setNumStorageWriteApiStreams(numStorageWriteApiStreams);
+ }
+ Boolean propagateSuccessfulStorageApiWrites =
+ configRow.getBoolean("propagate_successful_storage_api_writes");
+ if (propagateSuccessfulStorageApiWrites != null) {
+ builder =
+
builder.setPropagateSuccessfulStorageApiWrites(propagateSuccessfulStorageApiWrites);
+ }
+ Integer maxFilesPerPartition =
configRow.getInt32("max_files_per_partition");
+ if (maxFilesPerPartition != null) {
+ builder = builder.setMaxFilesPerPartition(maxFilesPerPartition);
+ }
+ Long maxBytesPerPartition =
configRow.getInt64("max_bytes_per_partition");
+ if (maxBytesPerPartition != null) {
+ builder = builder.setMaxBytesPerPartition(maxBytesPerPartition);
+ }
+ Duration triggerringFrequency =
configRow.getValue("triggerring_frequency");
+ if (triggerringFrequency != null) {
+ builder =
+ builder.setTriggeringFrequency(
+
org.joda.time.Duration.millis(triggerringFrequency.toMillis()));
+ }
+ byte[] methodBytes = configRow.getBytes("method");
+ if (methodBytes != null) {
+ builder = builder.setMethod((Write.Method)
fromByteArray(methodBytes));
+ }
+ String loadJobProjectId = configRow.getString("load_job_project_id");
+ if (loadJobProjectId != null) {
+ builder =
builder.setLoadJobProjectId(StaticValueProvider.of(loadJobProjectId));
+ }
+ byte[] failedInsertRetryPolicyBytes =
configRow.getBytes("failed_insert_retry_policy");
+ if (failedInsertRetryPolicyBytes != null) {
+ builder =
+ builder.setFailedInsertRetryPolicy(
+ (InsertRetryPolicy)
fromByteArray(failedInsertRetryPolicyBytes));
+ }
+ String customGcsTempLocations =
configRow.getString("custom_gcs_temp_location");
+ if (customGcsTempLocations != null) {
+ builder =
+
builder.setCustomGcsTempLocation(StaticValueProvider.of(customGcsTempLocations));
+ }
+ Boolean extendedErrorInfo =
configRow.getBoolean("extended_error_info");
+ if (extendedErrorInfo != null) {
+ builder = builder.setExtendedErrorInfo(extendedErrorInfo);
+ }
+ Boolean skipInvalidRows = configRow.getBoolean("skip_invalid_rows");
+ if (skipInvalidRows != null) {
+ builder = builder.setSkipInvalidRows(skipInvalidRows);
+ }
+ Boolean ignoreUnknownValues =
configRow.getBoolean("ignore_unknown_values");
+ if (ignoreUnknownValues != null) {
+ builder = builder.setIgnoreUnknownValues(ignoreUnknownValues);
+ }
+ Boolean ignoreInsertIds = configRow.getBoolean("ignore_insert_ids");
+ if (ignoreInsertIds != null) {
+ builder = builder.setIgnoreInsertIds(ignoreInsertIds);
+ }
+ Integer maxRetryJobs = configRow.getInt32("max_retry_jobs");
+ if (maxRetryJobs != null) {
+ builder = builder.setMaxRetryJobs(maxRetryJobs);
+ }
+ String kmsKey = configRow.getString("kms_key");
+ if (kmsKey != null) {
+ builder = builder.setKmsKey(kmsKey);
+ }
+ Collection<String> primaryKey = configRow.getArray("primary_key");
+ if (primaryKey != null && !primaryKey.isEmpty()) {
+ builder = builder.setPrimaryKey(ImmutableList.of(primaryKey));
+ }
+ byte[] defaultMissingValueInterpretationsBytes =
+ configRow.getBytes("default_missing_value_interpretation");
+ if (defaultMissingValueInterpretationsBytes != null) {
+ builder =
+ builder.setDefaultMissingValueInterpretation(
+ (MissingValueInterpretation)
+ fromByteArray(defaultMissingValueInterpretationsBytes));
+ }
+ Boolean optimizeWrites = configRow.getBoolean("optimize_writes");
+ if (optimizeWrites != null) {
+ builder = builder.setOptimizeWrites(optimizeWrites);
+ }
+ Boolean useBeamSchema = configRow.getBoolean("use_beam_schema");
+ if (useBeamSchema != null) {
+ builder = builder.setUseBeamSchema(useBeamSchema);
+ }
+ Boolean autoSharding = configRow.getBoolean("auto_sharding");
+ if (autoSharding != null) {
+ builder = builder.setAutoSharding(autoSharding);
+ }
+ Boolean propagateSuccessful =
configRow.getBoolean("propagate_successful");
+ if (propagateSuccessful != null) {
+ builder = builder.setPropagateSuccessful(propagateSuccessful);
+ }
+ Boolean autoSchemaUpdate = configRow.getBoolean("auto_schema_update");
+ if (autoSchemaUpdate != null) {
+ builder = builder.setAutoSchemaUpdate(autoSchemaUpdate);
+ }
+ byte[] writeProtosClasses = configRow.getBytes("write_protos_class");
+ if (writeProtosClasses != null) {
+ builder =
+ builder.setWriteProtosClass(
+ (Class)
fromByteArray(defaultMissingValueInterpretationsBytes));
+ }
+ Boolean directWriteProtos =
configRow.getBoolean("direct_write_protos");
+ if (directWriteProtos != null) {
+ builder = builder.setDirectWriteProtos(directWriteProtos);
+ }
+ byte[] deterministicRecordIdFnBytes =
configRow.getBytes("deterministic_record_id_fn");
+ if (deterministicRecordIdFnBytes != null) {
+ builder =
+ builder.setDeterministicRecordIdFn(
+ (SerializableFunction)
fromByteArray(deterministicRecordIdFnBytes));
+ }
+ String writeTempDataset = configRow.getString("write_temp_dataset");
+ if (writeTempDataset != null) {
+ builder = builder.setWriteTempDataset(writeTempDataset);
+ }
+ byte[] rowMutationInformationFnBytes =
configRow.getBytes("row_mutation_information_fn");
+ if (rowMutationInformationFnBytes != null) {
+ builder =
+ builder.setRowMutationInformationFn(
+ (SerializableFunction)
fromByteArray(rowMutationInformationFnBytes));
+ }
+
+ return builder.build();
+ } catch (InvalidClassException e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/sdks/java/io/kafka/upgrade/build.gradle
b/sdks/java/io/kafka/upgrade/build.gradle
index 78776e6e826..ca2823740dc 100644
--- a/sdks/java/io/kafka/upgrade/build.gradle
+++ b/sdks/java/io/kafka/upgrade/build.gradle
@@ -16,8 +16,6 @@
* limitations under the License.
*/
-import java.util.stream.Collectors
-
plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.kafka.upgrade',
diff --git
a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
index 16580cd219b..eedd2282b1f 100644
---
a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
+++
b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
@@ -22,6 +22,7 @@ import static
org.apache.beam.runners.core.construction.TransformUpgrader.toByte
import com.google.auto.service.AutoService;
import java.io.IOException;
+import java.io.InvalidClassException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
@@ -48,6 +49,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration;
import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -203,162 +205,183 @@ public class KafkaIOTranslation {
if (transform.getCheckStopReadingFn() != null) {
fieldValues.put("check_stop_reading_fn",
toByteArray(transform.getCheckStopReadingFn()));
}
+ if (transform.getBadRecordErrorHandler() != null) {
+ throw new RuntimeException(
+ "Upgrading KafkaIO read transforms that have
`withBadRecordErrorHandler` property set"
+ + "is not supported yet.");
+ }
return Row.withSchema(schema).withFieldValues(fieldValues).build();
}
@Override
public Read<?, ?> fromConfigRow(Row configRow) {
- Read<?, ?> transform = KafkaIO.read();
-
- Map<String, byte[]> consumerConfig = configRow.getMap("consumer_config");
- if (consumerConfig != null) {
- Map<String, Object> updatedConsumerConfig = new HashMap<>();
- consumerConfig.forEach(
- (key, dataBytes) -> {
- // Adding all allowed properties.
- if
(!KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.containsKey(key)) {
- if (consumerConfig.get(key) == null) {
- throw new IllegalArgumentException(
- "Encoded value of the consumer config property " + key +
" was null");
+ try {
+ Read<?, ?> transform = KafkaIO.read();
+
+ Map<String, byte[]> consumerConfig =
configRow.getMap("consumer_config");
+ if (consumerConfig != null) {
+ Map<String, Object> updatedConsumerConfig = new HashMap<>();
+ consumerConfig.forEach(
+ (key, dataBytes) -> {
+ // Adding all allowed properties.
+ if
(!KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.containsKey(key)) {
+ if (consumerConfig.get(key) == null) {
+ throw new IllegalArgumentException(
+ "Encoded value of the consumer config property " + key
+ " was null");
+ }
+ try {
+ updatedConsumerConfig.put(key,
fromByteArray(consumerConfig.get(key)));
+ } catch (InvalidClassException e) {
+ throw new RuntimeException(e);
+ }
}
- updatedConsumerConfig.put(key,
fromByteArray(consumerConfig.get(key)));
- }
- });
- transform = transform.withConsumerConfigUpdates(updatedConsumerConfig);
- }
- Collection<String> topics = configRow.getArray("topics");
- if (topics != null) {
- transform = transform.withTopics(new ArrayList<>(topics));
- }
- Collection<Row> topicPartitionRows =
configRow.getArray("topic_partitions");
- if (topicPartitionRows != null) {
- Collection<TopicPartition> topicPartitions =
- topicPartitionRows.stream()
- .map(
- row -> {
- String topic = row.getString("topic");
- if (topic == null) {
- throw new IllegalArgumentException("Expected the topic
to be not null");
- }
- Integer partition = row.getInt32("partition");
- if (partition == null) {
- throw new IllegalArgumentException("Expected the
partition to be not null");
- }
- return new TopicPartition(topic, partition);
- })
- .collect(Collectors.toList());
- transform =
transform.withTopicPartitions(Lists.newArrayList(topicPartitions));
- }
- String topicPattern = configRow.getString("topic_pattern");
- if (topicPattern != null) {
- transform = transform.withTopicPattern(topicPattern);
- }
+ });
+ transform =
transform.withConsumerConfigUpdates(updatedConsumerConfig);
+ }
+ Collection<String> topics = configRow.getArray("topics");
+ if (topics != null) {
+ transform = transform.withTopics(new ArrayList<>(topics));
+ }
+ Collection<Row> topicPartitionRows =
configRow.getArray("topic_partitions");
+ if (topicPartitionRows != null) {
+ Collection<TopicPartition> topicPartitions =
+ topicPartitionRows.stream()
+ .map(
+ row -> {
+ String topic = row.getString("topic");
+ if (topic == null) {
+ throw new IllegalArgumentException("Expected the
topic to be not null");
+ }
+ Integer partition = row.getInt32("partition");
+ if (partition == null) {
+ throw new IllegalArgumentException(
+ "Expected the partition to be not null");
+ }
+ return new TopicPartition(topic, partition);
+ })
+ .collect(Collectors.toList());
+ transform =
transform.withTopicPartitions(Lists.newArrayList(topicPartitions));
+ }
+ String topicPattern = configRow.getString("topic_pattern");
+ if (topicPattern != null) {
+ transform = transform.withTopicPattern(topicPattern);
+ }
- byte[] keyDeserializerProvider =
configRow.getBytes("key_deserializer_provider");
- if (keyDeserializerProvider != null) {
+ byte[] keyDeserializerProvider =
configRow.getBytes("key_deserializer_provider");
+ if (keyDeserializerProvider != null) {
+
+ byte[] keyCoder = configRow.getBytes("key_coder");
+ if (keyCoder != null) {
+ transform =
+ transform.withKeyDeserializerProviderAndCoder(
+ (DeserializerProvider)
fromByteArray(keyDeserializerProvider),
+ (org.apache.beam.sdk.coders.Coder)
fromByteArray(keyCoder));
+ } else {
+ transform =
+ transform.withKeyDeserializer(
+ (DeserializerProvider)
fromByteArray(keyDeserializerProvider));
+ }
+ }
- byte[] keyCoder = configRow.getBytes("key_coder");
- if (keyCoder != null) {
- transform =
- transform.withKeyDeserializerProviderAndCoder(
- (DeserializerProvider)
fromByteArray(keyDeserializerProvider),
- (org.apache.beam.sdk.coders.Coder) fromByteArray(keyCoder));
- } else {
- transform =
- transform.withKeyDeserializer(
- (DeserializerProvider)
fromByteArray(keyDeserializerProvider));
+ byte[] valueDeserializerProvider =
configRow.getBytes("value_deserializer_provider");
+ if (valueDeserializerProvider != null) {
+ byte[] valueCoder = configRow.getBytes("value_coder");
+ if (valueCoder != null) {
+ transform =
+ transform.withValueDeserializerProviderAndCoder(
+ (DeserializerProvider)
fromByteArray(valueDeserializerProvider),
+ (org.apache.beam.sdk.coders.Coder)
fromByteArray(valueCoder));
+ } else {
+ transform =
+ transform.withValueDeserializer(
+ (DeserializerProvider)
fromByteArray(valueDeserializerProvider));
+ }
}
- }
- byte[] valueDeserializerProvider =
configRow.getBytes("value_deserializer_provider");
- if (valueDeserializerProvider != null) {
- byte[] valueCoder = configRow.getBytes("value_coder");
- if (valueCoder != null) {
+ byte[] consumerFactoryFn = configRow.getBytes("consumer_factory_fn");
+ if (consumerFactoryFn != null) {
+ transform =
+ transform.withConsumerFactoryFn(
+ (SerializableFunction<Map<String, Object>, Consumer<byte[],
byte[]>>)
+ fromByteArray(consumerFactoryFn));
+ }
+ byte[] watermarkFn = configRow.getBytes("watermark_fn");
+ if (watermarkFn != null) {
+ transform = transform.withWatermarkFn2((SerializableFunction)
fromByteArray(watermarkFn));
+ }
+ Long maxNumRecords = configRow.getInt64("max_num_records");
+ if (maxNumRecords != null) {
+ transform = transform.withMaxNumRecords(maxNumRecords);
+ }
+ Duration maxReadTime = configRow.getValue("max_read_time");
+ if (maxReadTime != null) {
transform =
- transform.withValueDeserializerProviderAndCoder(
- (DeserializerProvider)
fromByteArray(valueDeserializerProvider),
- (org.apache.beam.sdk.coders.Coder)
fromByteArray(valueCoder));
- } else {
+
transform.withMaxReadTime(org.joda.time.Duration.millis(maxReadTime.toMillis()));
+ }
+ Instant startReadTime = configRow.getValue("start_read_time");
+ if (startReadTime != null) {
+ transform = transform.withStartReadTime(startReadTime);
+ }
+ Instant stopReadTime = configRow.getValue("stop_read_time");
+ if (stopReadTime != null) {
+ transform = transform.withStopReadTime(stopReadTime);
+ }
+ Boolean isCommitOffsetFinalizeEnabled =
+ configRow.getBoolean("is_commit_offset_finalize_enabled");
+ if (isCommitOffsetFinalizeEnabled != null &&
isCommitOffsetFinalizeEnabled) {
+ transform = transform.commitOffsetsInFinalize();
+ }
+ Boolean isDynamicRead = configRow.getBoolean("is_dynamic_read");
+ if (isDynamicRead != null && isDynamicRead) {
+ Duration watchTopicPartitionDuration =
+ configRow.getValue("watch_topic_partition_duration");
+ if (watchTopicPartitionDuration == null) {
+ throw new IllegalArgumentException(
+ "Expected watchTopicPartitionDuration to be available when
isDynamicRead is set to true");
+ }
transform =
- transform.withValueDeserializer(
- (DeserializerProvider)
fromByteArray(valueDeserializerProvider));
+ transform.withDynamicRead(
+
org.joda.time.Duration.millis(watchTopicPartitionDuration.toMillis()));
}
- }
- byte[] consumerFactoryFn = configRow.getBytes("consumer_factory_fn");
- if (consumerFactoryFn != null) {
- transform =
- transform.withConsumerFactoryFn(
- (SerializableFunction<Map<String, Object>, Consumer<byte[],
byte[]>>)
- fromByteArray(consumerFactoryFn));
- }
- byte[] watermarkFn = configRow.getBytes("watermark_fn");
- if (watermarkFn != null) {
- transform = transform.withWatermarkFn2((SerializableFunction)
fromByteArray(watermarkFn));
- }
- Long maxNumRecords = configRow.getInt64("max_num_records");
- if (maxNumRecords != null) {
- transform = transform.withMaxNumRecords(maxNumRecords);
- }
- Duration maxReadTime = configRow.getValue("max_read_time");
- if (maxReadTime != null) {
- transform =
-
transform.withMaxReadTime(org.joda.time.Duration.millis(maxReadTime.toMillis()));
- }
- Instant startReadTime = configRow.getValue("start_read_time");
- if (startReadTime != null) {
- transform = transform.withStartReadTime(startReadTime);
- }
- Instant stopReadTime = configRow.getValue("stop_read_time");
- if (stopReadTime != null) {
- transform = transform.withStopReadTime(stopReadTime);
- }
- Boolean isCommitOffsetFinalizeEnabled =
- configRow.getBoolean("is_commit_offset_finalize_enabled");
- if (isCommitOffsetFinalizeEnabled != null &&
isCommitOffsetFinalizeEnabled) {
- transform = transform.commitOffsetsInFinalize();
- }
- Boolean isDynamicRead = configRow.getBoolean("is_dynamic_read");
- if (isDynamicRead != null && isDynamicRead) {
- Duration watchTopicPartitionDuration =
configRow.getValue("watch_topic_partition_duration");
- if (watchTopicPartitionDuration == null) {
- throw new IllegalArgumentException(
- "Expected watchTopicPartitionDuration to be available when
isDynamicRead is set to true");
+ byte[] timestampPolicyFactory =
configRow.getBytes("timestamp_policy_factory");
+ if (timestampPolicyFactory != null) {
+ transform =
+ transform.withTimestampPolicyFactory(
+ (TimestampPolicyFactory)
fromByteArray(timestampPolicyFactory));
+ }
+ Map<String, byte[]> offsetConsumerConfig =
configRow.getMap("offset_consumer_config");
+ if (offsetConsumerConfig != null) {
+ Map<String, Object> updatedOffsetConsumerConfig = new HashMap<>();
+ offsetConsumerConfig.forEach(
+ (key, dataBytes) -> {
+ if (offsetConsumerConfig.get(key) == null) {
+ throw new IllegalArgumentException(
+ "Encoded value for the offset consumer config key " +
key + " was null.");
+ }
+ try {
+ updatedOffsetConsumerConfig.put(
+ key, fromByteArray(offsetConsumerConfig.get(key)));
+ } catch (InvalidClassException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ transform =
transform.withOffsetConsumerConfigOverrides(updatedOffsetConsumerConfig);
}
- transform =
- transform.withDynamicRead(
-
org.joda.time.Duration.millis(watchTopicPartitionDuration.toMillis()));
- }
- byte[] timestampPolicyFactory =
configRow.getBytes("timestamp_policy_factory");
- if (timestampPolicyFactory != null) {
- transform =
- transform.withTimestampPolicyFactory(
- (TimestampPolicyFactory)
fromByteArray(timestampPolicyFactory));
- }
- Map<String, byte[]> offsetConsumerConfig =
configRow.getMap("offset_consumer_config");
- if (offsetConsumerConfig != null) {
- Map<String, Object> updatedOffsetConsumerConfig = new HashMap<>();
- offsetConsumerConfig.forEach(
- (key, dataBytes) -> {
- if (offsetConsumerConfig.get(key) == null) {
- throw new IllegalArgumentException(
- "Encoded value for the offset consumer config key " + key
+ " was null.");
- }
- updatedOffsetConsumerConfig.put(key,
fromByteArray(offsetConsumerConfig.get(key)));
- });
- transform =
transform.withOffsetConsumerConfigOverrides(updatedOffsetConsumerConfig);
- }
+ byte[] checkStopReadinfFn =
configRow.getBytes("check_stop_reading_fn");
+ if (checkStopReadinfFn != null) {
+ transform =
+ transform.withCheckStopReadingFn(
+ (SerializableFunction<TopicPartition, Boolean>)
+ fromByteArray(checkStopReadinfFn));
+ }
- byte[] checkStopReadinfFn = configRow.getBytes("check_stop_reading_fn");
- if (checkStopReadinfFn != null) {
- transform =
- transform.withCheckStopReadingFn(
- (SerializableFunction<TopicPartition, Boolean>)
fromByteArray(checkStopReadinfFn));
+ return transform;
+ } catch (InvalidClassException e) {
+ throw new RuntimeException(e);
}
-
- return transform;
}
}
@@ -476,68 +499,83 @@ public class KafkaIOTranslation {
});
fieldValues.put("producer_config", producerConfigMap);
}
+ if (writeRecordsTransform.getBadRecordErrorHandler() != null
+ && !(writeRecordsTransform.getBadRecordErrorHandler()
+ instanceof ErrorHandler.DefaultErrorHandler)) {
+ throw new RuntimeException(
+ "Upgrading KafkaIO write transforms that have
`withBadRecordErrorHandler` property set"
+ + "is not supported yet.");
+ }
return Row.withSchema(schema).withFieldValues(fieldValues).build();
}
@Override
public Write<?, ?> fromConfigRow(Row configRow) {
- Write<?, ?> transform = KafkaIO.write();
+ try {
+ Write<?, ?> transform = KafkaIO.write();
- String bootstrapServers = configRow.getString("bootstrap_servers");
- if (bootstrapServers != null) {
- transform = transform.withBootstrapServers(bootstrapServers);
- }
- String topic = configRow.getValue("topic");
- if (topic != null) {
- transform = transform.withTopic(topic);
- }
- byte[] keySerializerBytes = configRow.getBytes("key_serializer");
- if (keySerializerBytes != null) {
- transform = transform.withKeySerializer((Class)
fromByteArray(keySerializerBytes));
- }
- byte[] valueSerializerBytes = configRow.getBytes("value_serializer");
- if (valueSerializerBytes != null) {
- transform = transform.withValueSerializer((Class)
fromByteArray(valueSerializerBytes));
- }
- byte[] producerFactoryFnBytes =
configRow.getBytes("producer_factory_fn");
- if (producerFactoryFnBytes != null) {
- transform =
- transform.withProducerFactoryFn(
- (SerializableFunction) fromByteArray(producerFactoryFnBytes));
- }
- Boolean isEOS = configRow.getBoolean("eos");
- if (isEOS != null && isEOS) {
- Integer numShards = configRow.getInt32("num_shards");
- String sinkGroupId = configRow.getString("sink_group_id");
- if (numShards == null) {
- throw new IllegalArgumentException(
- "Expected numShards to be provided when EOS is set to true");
+ String bootstrapServers = configRow.getString("bootstrap_servers");
+ if (bootstrapServers != null) {
+ transform = transform.withBootstrapServers(bootstrapServers);
}
- if (sinkGroupId == null) {
- throw new IllegalArgumentException(
- "Expected sinkGroupId to be provided when EOS is set to true");
+ String topic = configRow.getValue("topic");
+ if (topic != null) {
+ transform = transform.withTopic(topic);
+ }
+ byte[] keySerializerBytes = configRow.getBytes("key_serializer");
+ if (keySerializerBytes != null) {
+ transform = transform.withKeySerializer((Class)
fromByteArray(keySerializerBytes));
+ }
+ byte[] valueSerializerBytes = configRow.getBytes("value_serializer");
+ if (valueSerializerBytes != null) {
+ transform = transform.withValueSerializer((Class)
fromByteArray(valueSerializerBytes));
+ }
+ byte[] producerFactoryFnBytes =
configRow.getBytes("producer_factory_fn");
+ if (producerFactoryFnBytes != null) {
+ transform =
+ transform.withProducerFactoryFn(
+ (SerializableFunction)
fromByteArray(producerFactoryFnBytes));
+ }
+ Boolean isEOS = configRow.getBoolean("eos");
+ if (isEOS != null && isEOS) {
+ Integer numShards = configRow.getInt32("num_shards");
+ String sinkGroupId = configRow.getString("sink_group_id");
+ if (numShards == null) {
+ throw new IllegalArgumentException(
+ "Expected numShards to be provided when EOS is set to true");
+ }
+ if (sinkGroupId == null) {
+ throw new IllegalArgumentException(
+ "Expected sinkGroupId to be provided when EOS is set to true");
+ }
+ transform = transform.withEOS(numShards, sinkGroupId);
+ }
+ byte[] consumerFactoryFnBytes =
configRow.getBytes("consumer_factory_fn");
+ if (consumerFactoryFnBytes != null) {
+ transform =
+ transform.withConsumerFactoryFn(
+ (SerializableFunction)
fromByteArray(consumerFactoryFnBytes));
}
- transform = transform.withEOS(numShards, sinkGroupId);
- }
- byte[] consumerFactoryFnBytes =
configRow.getBytes("consumer_factory_fn");
- if (consumerFactoryFnBytes != null) {
- transform =
- transform.withConsumerFactoryFn(
- (SerializableFunction) fromByteArray(consumerFactoryFnBytes));
- }
- Map<String, byte[]> producerConfig = configRow.getMap("producer_config");
- if (producerConfig != null && !producerConfig.isEmpty()) {
- Map<String, Object> updatedProducerConfig = new HashMap<>();
- producerConfig.forEach(
- (key, dataBytes) -> {
- updatedProducerConfig.put(key, fromByteArray((byte[])
dataBytes));
- });
- transform = transform.withProducerConfigUpdates(updatedProducerConfig);
- }
+ Map<String, byte[]> producerConfig =
configRow.getMap("producer_config");
+ if (producerConfig != null && !producerConfig.isEmpty()) {
+ Map<String, Object> updatedProducerConfig = new HashMap<>();
+ producerConfig.forEach(
+ (key, dataBytes) -> {
+ try {
+ updatedProducerConfig.put(key, fromByteArray((byte[])
dataBytes));
+ } catch (InvalidClassException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ transform =
transform.withProducerConfigUpdates(updatedProducerConfig);
+ }
- return transform;
+ return transform;
+ } catch (InvalidClassException e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git
a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
index d99bee0ad20..be54d7830d5 100644
---
a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
+++
b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.io.kafka.KafkaIO.WriteRecords;
import
org.apache.beam.sdk.io.kafka.upgrade.KafkaIOTranslation.KafkaIOReadWithMetadataTranslator;
import
org.apache.beam.sdk.io.kafka.upgrade.KafkaIOTranslation.KafkaIOWriteTranslator;
import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;
@@ -123,6 +124,9 @@ public class KafkaIOTranslationTest {
@Test
public void testReadTransformRowIncludesAllFields() throws Exception {
+ // TODO: support 'withBadRecordErrorHandler' property.
+ List<String> fieldsToIgnore =
+ ImmutableList.of("getBadRecordRouter", "getBadRecordErrorHandler");
List<String> getMethodNames =
Arrays.stream(Read.class.getDeclaredMethods())
.map(
@@ -130,6 +134,7 @@ public class KafkaIOTranslationTest {
return method.getName();
})
.filter(methodName -> methodName.startsWith("get"))
+ .filter(methodName -> !fieldsToIgnore.contains(methodName))
.collect(Collectors.toList());
// Just to make sure that this does not pass trivially.
@@ -189,6 +194,11 @@ public class KafkaIOTranslationTest {
@Test
public void testWriteTransformRowIncludesAllFields() throws Exception {
+ // For these fields, default value will suffice (so no need to serialize
when upgrading).
+ // TODO: support 'withBadRecordErrorHandler' property.
+ List<String> fieldsToIgnore =
+ ImmutableList.of("getBadRecordRouter", "getBadRecordErrorHandler");
+
// Write transform delegates property handling to the WriteRecords class.
So we inspect the
// WriteRecords class here.
List<String> getMethodNames =
@@ -198,6 +208,7 @@ public class KafkaIOTranslationTest {
return method.getName();
})
.filter(methodName -> methodName.startsWith("get"))
+ .filter(methodName -> !fieldsToIgnore.contains(methodName))
.collect(Collectors.toList());
// Just to make sure that this does not pass trivially.