This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit a808f74ce0342f93af131de5edc6cae56b292fd7 Author: Tim Brown <[email protected]> AuthorDate: Mon Sep 11 06:35:02 2023 -0500 [HUDI-6728] Update BigQuery manifest sync to support schema evolution (#9482) Adds schema evolution support to the BigQuerySyncTool by converting the Hudi schema into the BigQuery Schema format when creating and updating the table. --- hudi-gcp/pom.xml | 13 + .../hudi/gcp/bigquery/BigQuerySchemaResolver.java | 197 ++++++++++++++ .../hudi/gcp/bigquery/BigQuerySyncConfig.java | 3 +- .../apache/hudi/gcp/bigquery/BigQuerySyncTool.java | 95 ++++--- .../gcp/bigquery/HoodieBigQuerySyncClient.java | 49 +++- .../gcp/bigquery/TestBigQuerySchemaResolver.java | 299 +++++++++++++++++++++ .../hudi/gcp/bigquery/TestBigQuerySyncTool.java | 137 ++++++++++ .../gcp/bigquery/TestHoodieBigQuerySyncClient.java | 119 ++++++++ .../org/apache/hudi/sync/adb/AdbSyncConfig.java | 2 +- .../apache/hudi/sync/common/HoodieSyncClient.java | 4 + .../hudi/sync/common/util/ManifestFileWriter.java | 28 +- .../sync/common/util/TestManifestFileWriter.java | 8 +- 12 files changed, 895 insertions(+), 59 deletions(-) diff --git a/hudi-gcp/pom.xml b/hudi-gcp/pom.xml index 202cbc2f8d9..c0a401551de 100644 --- a/hudi-gcp/pom.xml +++ b/hudi-gcp/pom.xml @@ -84,6 +84,12 @@ See https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google <artifactId>parquet-avro</artifactId> </dependency> + <!-- Avro --> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + <!-- Hadoop --> <dependency> <groupId>org.apache.hadoop</groupId> @@ -97,6 +103,13 @@ See https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-hive-sync</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> <build> diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySchemaResolver.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySchemaResolver.java new file mode 100644 index 00000000000..035ce604e2b --- /dev/null +++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySchemaResolver.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.gcp.bigquery; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.exception.HoodieException; + +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; + +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Extracts the BigQuery schema from a Hudi table. + */ +class BigQuerySchemaResolver { + private static final BigQuerySchemaResolver INSTANCE = new BigQuerySchemaResolver(TableSchemaResolver::new); + + private final Function<HoodieTableMetaClient, TableSchemaResolver> tableSchemaResolverSupplier; + + @VisibleForTesting + BigQuerySchemaResolver(Function<HoodieTableMetaClient, TableSchemaResolver> tableSchemaResolverSupplier) { + this.tableSchemaResolverSupplier = tableSchemaResolverSupplier; + } + + static BigQuerySchemaResolver getInstance() { + return INSTANCE; + } + + /** + * Get the BigQuery schema for the table. If the BigQuery table is configured with partitioning, the caller must pass in the partition fields so that they are not returned in the schema. + * If the partition fields are in the schema, it will cause an error when querying the table since BigQuery will treat it as a duplicate column. + * @param metaClient Meta client for the Hudi table + * @param partitionFields The fields that are used for partitioning in BigQuery + * @return The BigQuery schema for the table + */ + Schema getTableSchema(HoodieTableMetaClient metaClient, List<String> partitionFields) { + try { + Schema schema = convertSchema(tableSchemaResolverSupplier.apply(metaClient).getTableAvroSchema()); + if (partitionFields.isEmpty()) { + return schema; + } else { + return Schema.of(schema.getFields().stream().filter(field -> !partitionFields.contains(field.getName())).collect(Collectors.toList())); + } + } catch (Exception e) { + throw new HoodieBigQuerySyncException("Failed to get table schema", e); + } + } + + /** + * Converts a BigQuery schema to the string representation used in the BigQuery SQL command to create the manifest based table. + * @param schema The BigQuery schema + * @return The string representation of the schema + */ + public static String schemaToSqlString(Schema schema) { + return fieldsToSqlString(schema.getFields()); + } + + private static String fieldsToSqlString(List<Field> fields) { + return fields.stream().map(field -> { + String mode = field.getMode() == Field.Mode.REQUIRED ? " NOT NULL" : ""; + String type; + if (field.getType().getStandardType() == StandardSQLTypeName.STRUCT) { + type = String.format("STRUCT<%s>", fieldsToSqlString(field.getSubFields())); + } else { + type = field.getType().getStandardType().name(); + } + String name = field.getName(); + if (field.getMode() == Field.Mode.REPEATED) { + return String.format("%s ARRAY<%s>", name, type); + } else { + return String.format("%s %s%s", name, type, mode); + } + }).collect(Collectors.joining(", ")); + } + + @VisibleForTesting + Schema convertSchema(org.apache.avro.Schema schema) { + return Schema.of(getFields(schema)); + } + + private Field getField(org.apache.avro.Schema fieldSchema, String name, boolean nullable) { + final Field.Mode fieldMode = nullable ? Field.Mode.NULLABLE : Field.Mode.REQUIRED; + StandardSQLTypeName standardSQLTypeName; + switch (fieldSchema.getType()) { + case INT: + case LONG: + LogicalType logicalType = fieldSchema.getLogicalType(); + if (logicalType == null) { + standardSQLTypeName = StandardSQLTypeName.INT64; + } else if (logicalType.equals(LogicalTypes.date())) { + standardSQLTypeName = StandardSQLTypeName.DATE; + } else if (logicalType.equals(LogicalTypes.timeMillis()) || logicalType.equals(LogicalTypes.timeMicros())) { + standardSQLTypeName = StandardSQLTypeName.TIME; + } else if (logicalType.equals(LogicalTypes.timestampMillis()) || logicalType.equals(LogicalTypes.timestampMicros())) { + standardSQLTypeName = StandardSQLTypeName.TIMESTAMP; + // Due to older avro support, we need to use strings for local timestamp logical types + } else if (logicalType.getName().equals("local-timestamp-millis") || logicalType.getName().equals("local-timestamp-micros")) { + standardSQLTypeName = StandardSQLTypeName.INT64; + } else { + throw new IllegalArgumentException("Unexpected logical type in schema: " + logicalType); + } + break; + case ENUM: + case STRING: + standardSQLTypeName = StandardSQLTypeName.STRING; + break; + case BOOLEAN: + standardSQLTypeName = StandardSQLTypeName.BOOL; + break; + case DOUBLE: + case FLOAT: + standardSQLTypeName = StandardSQLTypeName.FLOAT64; + break; + case BYTES: + case FIXED: + LogicalType bytesLogicalType = fieldSchema.getLogicalType(); + if (bytesLogicalType == null) { + standardSQLTypeName = StandardSQLTypeName.BYTES; + } else if (bytesLogicalType instanceof LogicalTypes.Decimal) { + standardSQLTypeName = StandardSQLTypeName.NUMERIC; + } else { + throw new IllegalArgumentException("Unexpected logical type in schema: " + bytesLogicalType); + } + break; + case RECORD: + return Field.newBuilder(name, StandardSQLTypeName.STRUCT, + FieldList.of(getFields(fieldSchema))).setMode(fieldMode).build(); + case ARRAY: + Field arrayField = getField(fieldSchema.getElementType(), "array", true); + return Field.newBuilder(name, arrayField.getType(), arrayField.getSubFields()).setMode(Field.Mode.REPEATED).build(); + case MAP: + Field keyField = Field.newBuilder("key", StandardSQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build(); + Field valueField = getField(fieldSchema.getValueType(), "value", false); + Field keyValueField = Field.newBuilder("key_value", StandardSQLTypeName.STRUCT, keyField, valueField).setMode(Field.Mode.REPEATED).build(); + return Field.newBuilder(name, StandardSQLTypeName.STRUCT, keyValueField).setMode(Field.Mode.NULLABLE).build(); + case UNION: + List<org.apache.avro.Schema> subTypes = fieldSchema.getTypes(); + validateUnion(subTypes); + org.apache.avro.Schema fieldSchemaFromUnion = subTypes.get(0).getType() == org.apache.avro.Schema.Type.NULL ? subTypes.get(1) : subTypes.get(0); + nullable = true; + return getField(fieldSchemaFromUnion, name, nullable); + default: + throw new RuntimeException("Unexpected field type: " + fieldSchema.getType()); + } + return Field.newBuilder(name, standardSQLTypeName).setMode(fieldMode).build(); + } + + private List<Field> getFields(org.apache.avro.Schema schema) { + return schema.getFields().stream().map(field -> { + final org.apache.avro.Schema fieldSchema; + final boolean nullable; + if (field.schema().getType() == org.apache.avro.Schema.Type.UNION) { + List<org.apache.avro.Schema> subTypes = field.schema().getTypes(); + validateUnion(subTypes); + fieldSchema = subTypes.get(0).getType() == org.apache.avro.Schema.Type.NULL ? subTypes.get(1) : subTypes.get(0); + nullable = true; + } else { + fieldSchema = field.schema(); + nullable = false; + } + return getField(fieldSchema, field.name(), nullable); + }).collect(Collectors.toList()); + } + + private void validateUnion(List<org.apache.avro.Schema> subTypes) { + if (subTypes.size() != 2 || (subTypes.get(0).getType() != org.apache.avro.Schema.Type.NULL + && subTypes.get(1).getType() != org.apache.avro.Schema.Type.NULL)) { + throw new HoodieException("Only unions of a single type and null are currently supported"); + } + } +} diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java index 1f99a57b550..8630bacc9c0 100644 --- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java +++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.sync.common.HoodieSyncConfig; @@ -168,7 +169,7 @@ public class BigQuerySyncConfig extends HoodieSyncConfig implements Serializable props.setPropertyIfNonNull(BIGQUERY_SYNC_SOURCE_URI.key(), sourceUri); props.setPropertyIfNonNull(BIGQUERY_SYNC_SOURCE_URI_PREFIX.key(), sourceUriPrefix); props.setPropertyIfNonNull(BIGQUERY_SYNC_SYNC_BASE_PATH.key(), hoodieSyncConfigParams.basePath); - props.setPropertyIfNonNull(BIGQUERY_SYNC_PARTITION_FIELDS.key(), String.join(",", hoodieSyncConfigParams.partitionFields)); + props.setPropertyIfNonNull(BIGQUERY_SYNC_PARTITION_FIELDS.key(), StringUtils.join(",", hoodieSyncConfigParams.partitionFields)); props.setPropertyIfNonNull(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), hoodieSyncConfigParams.useFileListingFromMetadata); props.setPropertyIfNonNull(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING.key(), hoodieSyncConfigParams.assumeDatePartitioning); return props; diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java index 47aa342dad0..d44c9d533ab 100644 --- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java +++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java @@ -19,25 +19,28 @@ package org.apache.hudi.gcp.bigquery; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.sync.common.HoodieSyncTool; import org.apache.hudi.sync.common.util.ManifestFileWriter; import com.beust.jcommander.JCommander; +import com.google.cloud.bigquery.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.List; import java.util.Properties; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS; -import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX; -import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_TABLE_NAME; +import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA; /** @@ -52,34 +55,63 @@ public class BigQuerySyncTool extends HoodieSyncTool { private static final Logger LOG = LoggerFactory.getLogger(BigQuerySyncTool.class); - public final BigQuerySyncConfig config; - public final String tableName; - public final String manifestTableName; - public final String versionsTableName; - public final String snapshotViewName; + private final BigQuerySyncConfig config; + private final String tableName; + private final String manifestTableName; + private final String versionsTableName; + private final String snapshotViewName; + private final ManifestFileWriter manifestFileWriter; + private final HoodieBigQuerySyncClient bqSyncClient; + private final HoodieTableMetaClient metaClient; + private final BigQuerySchemaResolver bqSchemaResolver; public BigQuerySyncTool(Properties props) { + // will build file writer, client, etc. from configs super(props); this.config = new BigQuerySyncConfig(props); this.tableName = config.getString(BIGQUERY_SYNC_TABLE_NAME); this.manifestTableName = tableName + "_manifest"; this.versionsTableName = tableName + "_versions"; this.snapshotViewName = tableName; + this.bqSyncClient = new HoodieBigQuerySyncClient(config); + // reuse existing meta client if not provided (only test cases will provide their own meta client) + this.metaClient = bqSyncClient.getMetaClient(); + this.manifestFileWriter = buildManifestFileWriterFromConfig(metaClient, config); + this.bqSchemaResolver = BigQuerySchemaResolver.getInstance(); + } + + @VisibleForTesting // allows us to pass in mocks for the writer and client + BigQuerySyncTool(Properties properties, ManifestFileWriter manifestFileWriter, HoodieBigQuerySyncClient bigQuerySyncClient, HoodieTableMetaClient metaClient, + BigQuerySchemaResolver bigQuerySchemaResolver) { + super(properties); + this.config = new BigQuerySyncConfig(props); + this.tableName = config.getString(BIGQUERY_SYNC_TABLE_NAME); + this.manifestTableName = tableName + "_manifest"; + this.versionsTableName = tableName + "_versions"; + this.snapshotViewName = tableName; + this.bqSyncClient = bigQuerySyncClient; + this.metaClient = metaClient; + this.manifestFileWriter = manifestFileWriter; + this.bqSchemaResolver = bigQuerySchemaResolver; + } + + private static ManifestFileWriter buildManifestFileWriterFromConfig(HoodieTableMetaClient metaClient, BigQuerySyncConfig config) { + return ManifestFileWriter.builder() + .setMetaClient(metaClient) + .setUseFileListingFromMetadata(config.getBoolean(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA)) + .setAssumeDatePartitioning(config.getBoolean(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING)) + .build(); } @Override public void syncHoodieTable() { - try (HoodieBigQuerySyncClient bqSyncClient = new HoodieBigQuerySyncClient(config)) { - switch (bqSyncClient.getTableType()) { - case COPY_ON_WRITE: - case MERGE_ON_READ: - syncTable(bqSyncClient); - break; - default: - throw new UnsupportedOperationException(bqSyncClient.getTableType() + " table type is not supported yet."); - } - } catch (Exception e) { - throw new HoodieBigQuerySyncException("Failed to sync BigQuery for table:" + tableName, e); + switch (bqSyncClient.getTableType()) { + case COPY_ON_WRITE: + case MERGE_ON_READ: + syncTable(bqSyncClient); + break; + default: + throw new UnsupportedOperationException(bqSyncClient.getTableType() + " table type is not supported yet."); } } @@ -92,29 +124,26 @@ public class BigQuerySyncTool extends HoodieSyncTool { } private void syncTable(HoodieBigQuerySyncClient bqSyncClient) { - ValidationUtils.checkState(bqSyncClient.getTableType() == HoodieTableType.COPY_ON_WRITE); LOG.info("Sync hoodie table " + snapshotViewName + " at base path " + bqSyncClient.getBasePath()); if (!bqSyncClient.datasetExists()) { throw new HoodieBigQuerySyncException("Dataset not found: " + config.getString(BIGQUERY_SYNC_DATASET_NAME)); } - ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder() - .setConf(config.getHadoopConf()) - .setBasePath(config.getString(BIGQUERY_SYNC_SYNC_BASE_PATH)) - .setUseFileListingFromMetadata(config.getBoolean(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA)) - .setAssumeDatePartitioning(config.getBoolean(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING)) - .build(); - + List<String> partitionFields = !StringUtils.isNullOrEmpty(config.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX)) ? config.getSplitStrings(BIGQUERY_SYNC_PARTITION_FIELDS) : Collections.emptyList(); + Schema latestSchema = bqSchemaResolver.getTableSchema(metaClient, partitionFields); if (config.getBoolean(BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE)) { manifestFileWriter.writeManifestFile(true); - if (!tableExists(bqSyncClient, tableName)) { bqSyncClient.createTableUsingBqManifestFile( tableName, manifestFileWriter.getManifestSourceUri(true), - config.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX)); + config.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX), + latestSchema); LOG.info("Completed table " + tableName + " creation using the manifest file"); + } else { + bqSyncClient.updateTableSchema(tableName, latestSchema, partitionFields); + LOG.info("Synced schema for " + tableName); } LOG.info("Sync table complete for " + tableName); @@ -146,6 +175,12 @@ public class BigQuerySyncTool extends HoodieSyncTool { LOG.info("Sync table complete for " + snapshotViewName); } + @Override + public void close() throws Exception { + super.close(); + bqSyncClient.close(); + } + public static void main(String[] args) { final BigQuerySyncConfig.BigQuerySyncConfigParams params = new BigQuerySyncConfig.BigQuerySyncConfigParams(); JCommander cmd = JCommander.newBuilder().addObject(params).build(); diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java index 8c8372a992a..fa32f931049 100644 --- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java +++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java @@ -20,6 +20,7 @@ package org.apache.hudi.gcp.bigquery; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.sync.common.HoodieSyncClient; import com.google.cloud.bigquery.BigQuery; @@ -49,6 +50,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME; @@ -71,6 +73,15 @@ public class HoodieBigQuerySyncClient extends HoodieSyncClient { this.createBigQueryConnection(); } + @VisibleForTesting + HoodieBigQuerySyncClient(final BigQuerySyncConfig config, final BigQuery bigquery) { + super(config); + this.config = config; + this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID); + this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME); + this.bigquery = bigquery; + } + private void createBigQueryConnection() { if (bigquery == null) { try { @@ -84,14 +95,15 @@ public class HoodieBigQuerySyncClient extends HoodieSyncClient { } } - public void createTableUsingBqManifestFile(String tableName, String bqManifestFileUri, String sourceUriPrefix) { + public void createTableUsingBqManifestFile(String tableName, String bqManifestFileUri, String sourceUriPrefix, Schema schema) { try { - String withClauses = ""; - String extraOptions = ""; + String withClauses = String.format("( %s )", BigQuerySchemaResolver.schemaToSqlString(schema)); + String extraOptions = "enable_list_inference=true,"; if (!StringUtils.isNullOrEmpty(sourceUriPrefix)) { - withClauses = "WITH PARTITION COLUMNS"; - extraOptions = String.format("hive_partition_uri_prefix=\"%s\",", sourceUriPrefix); + withClauses += " WITH PARTITION COLUMNS"; + extraOptions += String.format(" hive_partition_uri_prefix=\"%s\",", sourceUriPrefix); } + String query = String.format( "CREATE EXTERNAL TABLE `%s.%s.%s` %s OPTIONS (%s " @@ -148,6 +160,33 @@ public class HoodieBigQuerySyncClient extends HoodieSyncClient { } } + /** + * Updates the schema for the given table if the schema has changed. The schema passed in will not have the partition columns defined, + * so we add them back to the schema with the values read from the existing BigQuery table. This allows us to keep the partition + * field type in sync with how it is registered in BigQuery. + * @param tableName name of the table in BigQuery + * @param schema latest schema for the table + */ + public void updateTableSchema(String tableName, Schema schema, List<String> partitionFields) { + Table existingTable = bigquery.getTable(TableId.of(projectId, datasetName, tableName)); + ExternalTableDefinition definition = existingTable.getDefinition(); + Schema remoteTableSchema = definition.getSchema(); + // Add the partition fields into the schema to avoid conflicts while updating + List<Field> updatedTableFields = remoteTableSchema.getFields().stream() + .filter(field -> partitionFields.contains(field.getName())) + .collect(Collectors.toList()); + updatedTableFields.addAll(schema.getFields()); + Schema finalSchema = Schema.of(updatedTableFields); + if (definition.getSchema() != null && definition.getSchema().equals(finalSchema)) { + return; // No need to update schema. + } + Table updatedTable = existingTable.toBuilder() + .setDefinition(definition.toBuilder().setSchema(finalSchema).setAutodetect(false).build()) + .build(); + + bigquery.update(updatedTable); + } + public void createVersionsTable(String tableName, String sourceUri, String sourceUriPrefix, List<String> partitionFields) { try { ExternalTableDefinition customTable; diff --git a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySchemaResolver.java b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySchemaResolver.java new file mode 100644 index 00000000000..bb45f0b7d56 --- /dev/null +++ b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySchemaResolver.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.gcp.bigquery; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; + +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.StandardSQLTypeName; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.apache.hudi.gcp.bigquery.BigQuerySchemaResolver.schemaToSqlString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestBigQuerySchemaResolver { + private static final com.google.cloud.bigquery.Schema PRIMITIVE_TYPES_BQ_SCHEMA = com.google.cloud.bigquery.Schema.of( + Field.newBuilder("requiredBoolean", StandardSQLTypeName.BOOL).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("optionalBoolean", StandardSQLTypeName.BOOL).setMode(Field.Mode.NULLABLE).build(), + Field.newBuilder("requiredInt", StandardSQLTypeName.INT64).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("optionalInt", StandardSQLTypeName.INT64).setMode(Field.Mode.NULLABLE).build(), + Field.newBuilder("requiredLong", StandardSQLTypeName.INT64).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("optionalLong", StandardSQLTypeName.INT64).setMode(Field.Mode.NULLABLE).build(), + Field.newBuilder("requiredDouble", StandardSQLTypeName.FLOAT64).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("optionalDouble", StandardSQLTypeName.FLOAT64).setMode(Field.Mode.NULLABLE).build(), + Field.newBuilder("requiredFloat", StandardSQLTypeName.FLOAT64).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("optionalFloat", StandardSQLTypeName.FLOAT64).setMode(Field.Mode.NULLABLE).build(), + Field.newBuilder("requiredString", StandardSQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("optionalString", StandardSQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build(), + Field.newBuilder("requiredBytes", StandardSQLTypeName.BYTES).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("optionalBytes", StandardSQLTypeName.BYTES).setMode(Field.Mode.NULLABLE).build(), + Field.newBuilder("requiredEnum", StandardSQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("optionalEnum", StandardSQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build()); + private static final Schema PRIMITIVE_TYPES = SchemaBuilder.record("testRecord") + .fields() + .requiredBoolean("requiredBoolean") + .optionalBoolean("optionalBoolean") + .requiredInt("requiredInt") + .optionalInt("optionalInt") + .requiredLong("requiredLong") + .optionalLong("optionalLong") + .requiredDouble("requiredDouble") + .optionalDouble("optionalDouble") + .requiredFloat("requiredFloat") + .optionalFloat("optionalFloat") + .requiredString("requiredString") + .optionalString("optionalString") + .requiredBytes("requiredBytes") + .optionalBytes("optionalBytes") + .name("requiredEnum").type().enumeration("REQUIRED_ENUM").symbols("ONE", "TWO").enumDefault("ONE") + .name("optionalEnum").type().optional().enumeration("OPTIONAL_ENUM").symbols("ONE", "TWO") + .endRecord(); + private static final Schema NESTED_FIELDS = SchemaBuilder.record("testRecord") + .fields() + .name("nestedOne") + .type() + .optional() + .record("nestedOneType").fields() + .optionalInt("nestedOptionalInt") + .requiredDouble("nestedRequiredDouble") + .name("nestedTwo") + .type(SchemaBuilder.record("nestedTwoType").fields() + .optionalString("doublyNestedString").endRecord()).noDefault() + .endRecord() + .endRecord(); + private static final Schema LISTS = SchemaBuilder.record("testRecord") + .fields() + .name("intList") + .type() + .array() + .items() + .intType().noDefault() + .name("recordList") + .type() + .nullable() + .array() + .items(SchemaBuilder.record("randomname").fields().requiredDouble("requiredDouble").optionalString("optionalString").endRecord()) + .noDefault() + .endRecord(); + private static final BigQuerySchemaResolver SCHEMA_RESOLVER = BigQuerySchemaResolver.getInstance(); + + @Test + void convertSchema_primitiveFields() { + Assertions.assertEquals(PRIMITIVE_TYPES_BQ_SCHEMA, SCHEMA_RESOLVER.convertSchema(PRIMITIVE_TYPES)); + } + + @Test + void convertSchemaToString_primitiveTypes() { + String expectedSqlSchema = "requiredBoolean BOOL NOT NULL, " + + "optionalBoolean BOOL, " + + "requiredInt INT64 NOT NULL, " + + "optionalInt INT64, " + + "requiredLong INT64 NOT NULL, " + + "optionalLong INT64, " + + "requiredDouble FLOAT64 NOT NULL, " + + "optionalDouble FLOAT64, " + + "requiredFloat FLOAT64 NOT NULL, " + + "optionalFloat FLOAT64, " + + "requiredString STRING NOT NULL, " + + "optionalString STRING, " + + "requiredBytes BYTES NOT NULL, " + + "optionalBytes BYTES, " + + "requiredEnum STRING NOT NULL, " + + "optionalEnum STRING"; + Assertions.assertEquals(expectedSqlSchema, schemaToSqlString(SCHEMA_RESOLVER.convertSchema(PRIMITIVE_TYPES))); + } + + @Test + void convertSchema_nestedFields() { + com.google.cloud.bigquery.Schema expected = com.google.cloud.bigquery.Schema.of( + Field.newBuilder("nestedOne", StandardSQLTypeName.STRUCT, + Field.newBuilder("nestedOptionalInt", StandardSQLTypeName.INT64).setMode(Field.Mode.NULLABLE).build(), + Field.newBuilder("nestedRequiredDouble", StandardSQLTypeName.FLOAT64).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("nestedTwo", StandardSQLTypeName.STRUCT, + Field.newBuilder("doublyNestedString", StandardSQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build()).setMode(Field.Mode.REQUIRED).build()) + .setMode(Field.Mode.NULLABLE).build()); + + Assertions.assertEquals(expected, SCHEMA_RESOLVER.convertSchema(NESTED_FIELDS)); + } + + @Test + void convertSchemaToString_nestedFields() { + String expectedSqlSchema = "nestedOne STRUCT<" + + "nestedOptionalInt INT64, " + + "nestedRequiredDouble FLOAT64 NOT NULL, " + + "nestedTwo STRUCT<doublyNestedString STRING> NOT NULL>"; + Assertions.assertEquals(expectedSqlSchema, schemaToSqlString(SCHEMA_RESOLVER.convertSchema(NESTED_FIELDS))); + } + + @Test + void convertSchema_lists() { + Field intListField = Field.newBuilder("intList", StandardSQLTypeName.INT64).setMode(Field.Mode.REPEATED).build(); + + Field requiredDoubleField = Field.newBuilder("requiredDouble", StandardSQLTypeName.FLOAT64) + .setMode(Field.Mode.REQUIRED) + .build(); + Field optionalStringField = Field.newBuilder("optionalString", StandardSQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build(); + Field recordListField = Field.newBuilder("recordList", StandardSQLTypeName.STRUCT, + requiredDoubleField, optionalStringField).setMode(Field.Mode.REPEATED).build(); + + + com.google.cloud.bigquery.Schema expected = + com.google.cloud.bigquery.Schema.of(intListField, recordListField); + Assertions.assertEquals(expected, SCHEMA_RESOLVER.convertSchema(LISTS)); + } + + @Test + void convertSchemaToString_lists() { + String expectedSqlSchema = "intList ARRAY<INT64>, " + + "recordList ARRAY<STRUCT<requiredDouble FLOAT64 NOT NULL, optionalString STRING>>"; + Assertions.assertEquals(expectedSqlSchema, schemaToSqlString(SCHEMA_RESOLVER.convertSchema(LISTS))); + } + + @Test + void convertSchemaListOfNullableRecords() { + Schema nestedRecordType = SchemaBuilder.record("nested_record").fields().optionalString("inner_string_field").endRecord(); + Schema input = SchemaBuilder.record("top_level_schema") + .fields().name("top_level_schema_field") + .type() + .nullable() + .array() + .items(SchemaBuilder.unionOf().nullType().and().type(nestedRecordType).endUnion()) + .noDefault() + .endRecord(); + + Field innerStringField = Field.newBuilder("inner_string_field", StandardSQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build(); + Field topLevelSchemaField = Field.newBuilder("top_level_schema_field", StandardSQLTypeName.STRUCT, + innerStringField).setMode(Field.Mode.REPEATED).build(); + + com.google.cloud.bigquery.Schema expected = com.google.cloud.bigquery.Schema.of(topLevelSchemaField); + Assertions.assertEquals(expected, SCHEMA_RESOLVER.convertSchema(input)); + } + + @Test + void convertSchema_logicalTypes() { + String schemaString = "{\"type\":\"record\",\"name\":\"logicalTypes\",\"fields\":[{\"name\":\"int_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}}," + + "{\"name\":\"int_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"long_time_micros\",\"type\":{\"type\":\"long\",\"logicalType\":\"time-micros\"}}," + + "{\"name\":\"long_timestamp_millis\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}}," + + "{\"name\":\"long_timestamp_micros\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}}," + + "{\"name\":\"long_timestamp_millis_local\",\"type\":{\"type\":\"long\",\"logicalType\":\"local-timestamp-millis\"}}," + + "{\"name\":\"long_timestamp_micros_local\",\"type\":{\"type\":\"long\",\"logicalType\":\"local-timestamp-micros\"}}," + + "{\"name\":\"bytes_decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\", \"precision\": 4, \"scale\": 2}}]}"; + Schema.Parser parser = new Schema.Parser(); + Schema input = parser.parse(schemaString); + + com.google.cloud.bigquery.Schema expected = com.google.cloud.bigquery.Schema.of( + Field.newBuilder("int_date", StandardSQLTypeName.DATE).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("int_time_millis", StandardSQLTypeName.TIME).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("long_time_micros", StandardSQLTypeName.TIME).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("long_timestamp_millis", StandardSQLTypeName.TIMESTAMP).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("long_timestamp_micros", StandardSQLTypeName.TIMESTAMP).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("long_timestamp_millis_local", StandardSQLTypeName.INT64).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("long_timestamp_micros_local", StandardSQLTypeName.INT64).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("bytes_decimal", StandardSQLTypeName.NUMERIC).setMode(Field.Mode.REQUIRED).build()); + + Assertions.assertEquals(expected, SCHEMA_RESOLVER.convertSchema(input)); + } + + @Test + void convertSchema_maps() { + Schema input = SchemaBuilder.record("testRecord") + .fields() + .name("intMap") + .type() + .map() + .values() + .intType().noDefault() + .name("recordMap") + .type() + .nullable() + .map() + .values(SchemaBuilder.record("element").fields().requiredDouble("requiredDouble").optionalString("optionalString").endRecord()) + .noDefault() + .endRecord(); + + + com.google.cloud.bigquery.Schema expected = com.google.cloud.bigquery.Schema.of( + Field.newBuilder("intMap", StandardSQLTypeName.STRUCT, + Field.newBuilder("key_value", StandardSQLTypeName.STRUCT, + Field.newBuilder("key", StandardSQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("value", StandardSQLTypeName.INT64).setMode(Field.Mode.REQUIRED).build()) + .setMode(Field.Mode.REPEATED).build()) + .setMode(Field.Mode.NULLABLE).build(), + Field.newBuilder("recordMap", StandardSQLTypeName.STRUCT, + Field.newBuilder("key_value", StandardSQLTypeName.STRUCT, + Field.newBuilder("key", StandardSQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("value", StandardSQLTypeName.STRUCT, + Field.newBuilder("requiredDouble", StandardSQLTypeName.FLOAT64).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("optionalString", StandardSQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build() + ).setMode(Field.Mode.REQUIRED).build()).setMode(Field.Mode.REPEATED).build()) + .setMode(Field.Mode.NULLABLE).build()); + + Assertions.assertEquals(expected, SCHEMA_RESOLVER.convertSchema(input)); + } + + @Test + void getTableSchema_withPartitionFields() throws Exception { + HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class); + TableSchemaResolver mockTableSchemaResolver = mock(TableSchemaResolver.class); + when(mockTableSchemaResolver.getTableAvroSchema()).thenReturn(PRIMITIVE_TYPES); + BigQuerySchemaResolver resolver = new BigQuerySchemaResolver(metaClient -> mockTableSchemaResolver); + + com.google.cloud.bigquery.Schema expected = com.google.cloud.bigquery.Schema.of( + Field.newBuilder("requiredBoolean", StandardSQLTypeName.BOOL).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("optionalBoolean", StandardSQLTypeName.BOOL).setMode(Field.Mode.NULLABLE).build(), + Field.newBuilder("requiredInt", StandardSQLTypeName.INT64).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("optionalInt", StandardSQLTypeName.INT64).setMode(Field.Mode.NULLABLE).build(), + Field.newBuilder("requiredLong", StandardSQLTypeName.INT64).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("optionalLong", StandardSQLTypeName.INT64).setMode(Field.Mode.NULLABLE).build(), + Field.newBuilder("requiredDouble", StandardSQLTypeName.FLOAT64).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("optionalDouble", StandardSQLTypeName.FLOAT64).setMode(Field.Mode.NULLABLE).build(), + Field.newBuilder("requiredFloat", StandardSQLTypeName.FLOAT64).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("optionalFloat", StandardSQLTypeName.FLOAT64).setMode(Field.Mode.NULLABLE).build(), + Field.newBuilder("optionalString", StandardSQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build(), + Field.newBuilder("requiredBytes", StandardSQLTypeName.BYTES).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("optionalBytes", StandardSQLTypeName.BYTES).setMode(Field.Mode.NULLABLE).build(), + Field.newBuilder("requiredEnum", StandardSQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("optionalEnum", StandardSQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build()); + + // expect 'requiredString' field to be removed + Assertions.assertEquals(expected, resolver.getTableSchema(mockMetaClient, Collections.singletonList("requiredString"))); + } + + @Test + void getTableSchema_withoutPartitionFields() throws Exception { + HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class); + TableSchemaResolver mockTableSchemaResolver = mock(TableSchemaResolver.class); + when(mockTableSchemaResolver.getTableAvroSchema()).thenReturn(PRIMITIVE_TYPES); + when(mockTableSchemaResolver.getTableAvroSchema()).thenReturn(PRIMITIVE_TYPES); + BigQuerySchemaResolver resolver = new BigQuerySchemaResolver(metaClient -> mockTableSchemaResolver); + Assertions.assertEquals(PRIMITIVE_TYPES_BQ_SCHEMA, resolver.getTableSchema(mockMetaClient, Collections.emptyList())); + } +} diff --git a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java new file mode 100644 index 00000000000..5edbdac1c2e --- /dev/null +++ b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.gcp.bigquery; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.sync.common.util.ManifestFileWriter; + +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +public class TestBigQuerySyncTool { + private static final String TEST_TABLE = "test_table"; + private final ManifestFileWriter mockManifestFileWriter = mock(ManifestFileWriter.class); + private final HoodieBigQuerySyncClient mockBqSyncClient = mock(HoodieBigQuerySyncClient.class); + private final BigQuerySchemaResolver mockBqSchemaResolver = mock(BigQuerySchemaResolver.class); + private final HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class); + private final Properties properties = new Properties(); + + private final Schema schema = Schema.of(Field.of("id", StandardSQLTypeName.STRING)); + + @BeforeEach + void setup() { + // add default properties + properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_TABLE_NAME.key(), TEST_TABLE); + } + + @Test + void missingDatasetCausesFailure() { + when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE); + when(mockBqSyncClient.datasetExists()).thenReturn(false); + BigQuerySyncTool tool = new BigQuerySyncTool(properties, mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver); + assertThrows(HoodieBigQuerySyncException.class, tool::syncHoodieTable); + verifyNoInteractions(mockManifestFileWriter, mockBqSchemaResolver); + } + + @Test + void useBQManifestFile_newTablePartitioned() { + properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(), "true"); + String prefix = "file:///local/prefix"; + properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX.key(), prefix); + properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS.key(), "datestr,type"); + when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE); + when(mockBqSyncClient.datasetExists()).thenReturn(true); + when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(false); + Path manifestPath = new Path("file:///local/path"); + when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath()); + when(mockBqSchemaResolver.getTableSchema(any(), eq(Arrays.asList("datestr", "type")))).thenReturn(schema); + BigQuerySyncTool tool = new BigQuerySyncTool(properties, mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver); + tool.syncHoodieTable(); + verify(mockBqSyncClient).createTableUsingBqManifestFile(TEST_TABLE, manifestPath.toUri().getPath(), prefix, schema); + verify(mockManifestFileWriter).writeManifestFile(true); + } + + @Test + void useBQManifestFile_newTableNonPartitioned() { + properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(), "true"); + when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE); + when(mockBqSyncClient.datasetExists()).thenReturn(true); + when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(false); + Path manifestPath = new Path("file:///local/path"); + when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath()); + when(mockBqSchemaResolver.getTableSchema(any(), eq(Collections.emptyList()))).thenReturn(schema); + BigQuerySyncTool tool = new BigQuerySyncTool(properties, mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver); + tool.syncHoodieTable(); + verify(mockBqSyncClient).createTableUsingBqManifestFile(TEST_TABLE, manifestPath.toUri().getPath(), null, schema); + verify(mockManifestFileWriter).writeManifestFile(true); + } + + @Test + void useBQManifestFile_existingPartitionedTable() { + properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(), "true"); + String prefix = "file:///local/prefix"; + properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX.key(), prefix); + properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS.key(), "datestr,type"); + when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE); + when(mockBqSyncClient.datasetExists()).thenReturn(true); + when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(true); + Path manifestPath = new Path("file:///local/path"); + when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath()); + List<String> partitionFields = Arrays.asList("datestr", "type"); + when(mockBqSchemaResolver.getTableSchema(any(), eq(partitionFields))).thenReturn(schema); + BigQuerySyncTool tool = new BigQuerySyncTool(properties, mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver); + tool.syncHoodieTable(); + verify(mockBqSyncClient).updateTableSchema(TEST_TABLE, schema, partitionFields); + verify(mockManifestFileWriter).writeManifestFile(true); + } + + @Test + void useBQManifestFile_existingNonPartitionedTable() { + properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(), "true"); + when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE); + when(mockBqSyncClient.datasetExists()).thenReturn(true); + when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(true); + Path manifestPath = new Path("file:///local/path"); + when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath()); + when(mockBqSchemaResolver.getTableSchema(any(), eq(Collections.emptyList()))).thenReturn(schema); + BigQuerySyncTool tool = new BigQuerySyncTool(properties, mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver); + tool.syncHoodieTable(); + verify(mockBqSyncClient).updateTableSchema(TEST_TABLE, schema, Collections.emptyList()); + verify(mockManifestFileWriter).writeManifestFile(true); + } +} diff --git a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java new file mode 100644 index 00000000000..df7e6a9f31e --- /dev/null +++ b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.gcp.bigquery; + +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.sync.common.HoodieSyncConfig; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobStatus; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.ArgumentCaptor; + +import java.nio.file.Path; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestHoodieBigQuerySyncClient { + private static final String PROJECT_ID = "test_project"; + private static final String MANIFEST_FILE_URI = "file:/manifest_file"; + private static final String SOURCE_PREFIX = "file:/manifest_file/date=*"; + private static final String TEST_TABLE = "test_table"; + private static final String TEST_DATASET = "test_dataset"; + + static @TempDir Path tempDir; + + private static String basePath; + private final BigQuery mockBigQuery = mock(BigQuery.class); + private HoodieBigQuerySyncClient client; + + @BeforeAll + static void setupOnce() throws Exception { + basePath = tempDir.toString(); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(HoodieTableType.COPY_ON_WRITE) + .setTableName(TEST_TABLE) + .setPayloadClass(HoodieAvroPayload.class) + .initTable(new Configuration(), basePath); + } + + @BeforeEach + void setup() { + Properties properties = new Properties(); + properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID.key(), PROJECT_ID); + properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME.key(), TEST_DATASET); + properties.setProperty(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), tempDir.toString()); + BigQuerySyncConfig config = new BigQuerySyncConfig(properties); + client = new HoodieBigQuerySyncClient(config, mockBigQuery); + } + + @Test + void createTableWithManifestFile_partitioned() throws Exception { + Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING)); + ArgumentCaptor<JobInfo> jobInfoCaptor = ArgumentCaptor.forClass(JobInfo.class); + Job mockJob = mock(Job.class); + when(mockBigQuery.create(jobInfoCaptor.capture())).thenReturn(mockJob); + Job mockJobFinished = mock(Job.class); + when(mockJob.waitFor()).thenReturn(mockJobFinished); + JobStatus mockJobStatus = mock(JobStatus.class); + when(mockJobFinished.getStatus()).thenReturn(mockJobStatus); + when(mockJobStatus.getError()).thenReturn(null); + client.createTableUsingBqManifestFile(TEST_TABLE, MANIFEST_FILE_URI, SOURCE_PREFIX, schema); + + QueryJobConfiguration configuration = jobInfoCaptor.getValue().getConfiguration(); + assertEquals(configuration.getQuery(), + String.format("CREATE EXTERNAL TABLE `%s.%s` ( field STRING ) WITH PARTITION COLUMNS OPTIONS (enable_list_inference=true, hive_partition_uri_prefix=\"%s\", uris=[\"%s\"], format=\"PARQUET\", " + + "file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")", TEST_DATASET, TEST_TABLE, SOURCE_PREFIX, MANIFEST_FILE_URI)); + } + + @Test + void createTableWithManifestFile_nonPartitioned() throws Exception { + Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING)); + ArgumentCaptor<JobInfo> jobInfoCaptor = ArgumentCaptor.forClass(JobInfo.class); + Job mockJob = mock(Job.class); + when(mockBigQuery.create(jobInfoCaptor.capture())).thenReturn(mockJob); + Job mockJobFinished = mock(Job.class); + when(mockJob.waitFor()).thenReturn(mockJobFinished); + JobStatus mockJobStatus = mock(JobStatus.class); + when(mockJobFinished.getStatus()).thenReturn(mockJobStatus); + when(mockJobStatus.getError()).thenReturn(null); + client.createTableUsingBqManifestFile(TEST_TABLE, MANIFEST_FILE_URI, "", schema); + + QueryJobConfiguration configuration = jobInfoCaptor.getValue().getConfiguration(); + assertEquals(configuration.getQuery(), + String.format("CREATE EXTERNAL TABLE `%s.%s` ( field STRING ) OPTIONS (enable_list_inference=true, uris=[\"%s\"], format=\"PARQUET\", " + + "file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")", TEST_DATASET, TEST_TABLE, MANIFEST_FILE_URI)); + } +} diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java index e03388e1dba..442f796fdf6 100644 --- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java +++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java @@ -201,7 +201,7 @@ public class AdbSyncConfig extends HiveSyncConfig { props.setPropertyIfNonNull(ADB_SYNC_PASS.key(), hiveSyncConfigParams.hivePass); props.setPropertyIfNonNull(ADB_SYNC_JDBC_URL.key(), hiveSyncConfigParams.jdbcUrl); props.setPropertyIfNonNull(META_SYNC_BASE_PATH.key(), hiveSyncConfigParams.hoodieSyncConfigParams.basePath); - props.setPropertyIfNonNull(META_SYNC_PARTITION_FIELDS.key(), String.join(",", hiveSyncConfigParams.hoodieSyncConfigParams.partitionFields)); + props.setPropertyIfNonNull(META_SYNC_PARTITION_FIELDS.key(), StringUtils.join(",", hiveSyncConfigParams.hoodieSyncConfigParams.partitionFields)); props.setPropertyIfNonNull(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), hiveSyncConfigParams.hoodieSyncConfigParams.partitionValueExtractorClass); props.setPropertyIfNonNull(META_SYNC_ASSUME_DATE_PARTITION.key(), String.valueOf(hiveSyncConfigParams.hoodieSyncConfigParams.assumeDatePartitioning)); props.setPropertyIfNonNull(ADB_SYNC_SKIP_RO_SUFFIX.key(), String.valueOf(hiveSyncConfigParams.skipROSuffix)); diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java index 3eeb72f89e0..4c5fb01b9e7 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java @@ -83,6 +83,10 @@ public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, Auto return metaClient.getTableConfig().getBootstrapBasePath().isPresent(); } + public HoodieTableMetaClient getMetaClient() { + return metaClient; + } + /** * Get the set of dropped partitions since the last synced commit. * If last sync time is not known then consider only active timeline. diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java index c078884efc8..7090c194104 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java @@ -51,8 +51,8 @@ public class ManifestFileWriter { private final boolean useFileListingFromMetadata; private final boolean assumeDatePartitioning; - private ManifestFileWriter(Configuration hadoopConf, String basePath, boolean useFileListingFromMetadata, boolean assumeDatePartitioning) { - this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); + private ManifestFileWriter(HoodieTableMetaClient metaClient, boolean useFileListingFromMetadata, boolean assumeDatePartitioning) { + this.metaClient = metaClient; this.useFileListingFromMetadata = useFileListingFromMetadata; this.assumeDatePartitioning = assumeDatePartitioning; } @@ -122,21 +122,9 @@ public class ManifestFileWriter { * Builder for {@link ManifestFileWriter}. */ public static class Builder { - - private Configuration conf; - private String basePath; private boolean useFileListingFromMetadata; private boolean assumeDatePartitioning; - - public Builder setConf(Configuration conf) { - this.conf = conf; - return this; - } - - public Builder setBasePath(String basePath) { - this.basePath = basePath; - return this; - } + private HoodieTableMetaClient metaClient; public Builder setUseFileListingFromMetadata(boolean useFileListingFromMetadata) { this.useFileListingFromMetadata = useFileListingFromMetadata; @@ -148,10 +136,14 @@ public class ManifestFileWriter { return this; } + public Builder setMetaClient(HoodieTableMetaClient metaClient) { + this.metaClient = metaClient; + return this; + } + public ManifestFileWriter build() { - ValidationUtils.checkArgument(conf != null, "Configuration needs to be set to init ManifestFileGenerator"); - ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init ManifestFileGenerator"); - return new ManifestFileWriter(conf, basePath, useFileListingFromMetadata, assumeDatePartitioning); + ValidationUtils.checkArgument(metaClient != null, "MetaClient needs to be set to init ManifestFileGenerator"); + return new ManifestFileWriter(metaClient, useFileListingFromMetadata, assumeDatePartitioning); } } } diff --git a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileWriter.java b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileWriter.java index b01125853cb..85fd1ef4886 100644 --- a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileWriter.java +++ b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileWriter.java @@ -49,7 +49,7 @@ public class TestManifestFileWriter extends HoodieCommonTestHarness { public void testMultiLevelPartitionedTable() throws Exception { // Generate 10 files under each partition createTestDataForPartitionedTable(metaClient, 10); - ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build(); + ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setMetaClient(metaClient).build(); assertEquals(30, fetchLatestBaseFilesForAllPartitions(metaClient, false, false, false).count()); } @@ -57,7 +57,7 @@ public class TestManifestFileWriter extends HoodieCommonTestHarness { public void testCreateManifestFile() throws Exception { // Generate 10 files under each partition createTestDataForPartitionedTable(metaClient, 3); - ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build(); + ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setMetaClient(metaClient).build(); manifestFileWriter.writeManifestFile(false); Path manifestFilePath = manifestFileWriter.getManifestFilePath(false); try (InputStream is = metaClient.getFs().open(manifestFilePath)) { @@ -71,7 +71,7 @@ public class TestManifestFileWriter extends HoodieCommonTestHarness { public void testCreateManifestFileWithAbsolutePath() throws Exception { // Generate 10 files under each partition createTestDataForPartitionedTable(metaClient, 3); - ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build(); + ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setMetaClient(metaClient).build(); manifestFileWriter.writeManifestFile(true); Path manifestFilePath = manifestFileWriter.getManifestFilePath(true); try (InputStream is = metaClient.getFs().open(manifestFilePath)) { @@ -92,7 +92,7 @@ public class TestManifestFileWriter extends HoodieCommonTestHarness { @Test public void getManifestSourceUri() { - ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build(); + ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setMetaClient(metaClient).build(); String sourceUri = manifestFileWriter.getManifestSourceUri(false); assertEquals(new Path(basePath, ".hoodie/manifest/*").toUri().toString(), sourceUri);
