vingov commented on a change in pull request #5125: URL: https://github.com/apache/hudi/pull/5125#discussion_r839193430
########## File path: hudi-sync/hudi-bigquery-sync/src/main/java/org/apache/hudi/bigquery/BigQuerySyncTool.java ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.bigquery; + +import org.apache.hudi.bigquery.util.Utils; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.InvalidTableException; +import org.apache.hudi.sync.common.AbstractSyncTool; + +import com.beust.jcommander.JCommander; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.List; +import java.util.Properties; + +/** + * Tool to sync a hoodie table with a big query table. Either use it as an api + * BigQuerySyncTool.syncHoodieTable(BigQuerySyncConfig) or as a command line java -cp hoodie-hive.jar BigQuerySyncTool [args] + * <p> + * This utility will get the schema from the latest commit and will sync big query table schema + */ +public class BigQuerySyncTool extends AbstractSyncTool { + private static final Logger LOG = LogManager.getLogger(BigQuerySyncTool.class); + public final BigQuerySyncConfig cfg; + public final HoodieBigQueryClient hoodieBigQueryClient; + public String projectId; + public String datasetName; + public String manifestTableName; + public String versionsTableName; + public String snapshotViewName; + public String sourceUri; + public String sourceUriPrefix; + public List<String> partitionFields; + + private BigQuerySyncTool(Properties properties, Configuration conf, FileSystem fs) { + super(new TypedProperties(properties), conf, fs); + hoodieBigQueryClient = new HoodieBigQueryClient(Utils.propertiesToConfig(properties), fs); + cfg = Utils.propertiesToConfig(properties); + switch (hoodieBigQueryClient.getTableType()) { + case COPY_ON_WRITE: + projectId = cfg.projectId; + datasetName = cfg.datasetName; + manifestTableName = cfg.tableName + "_manifest"; + versionsTableName = cfg.tableName + "_versions"; + snapshotViewName = cfg.tableName; + sourceUri = cfg.sourceUri; + sourceUriPrefix = cfg.sourceUriPrefix; + partitionFields = cfg.partitionFields; + break; + case MERGE_ON_READ: + LOG.error("Not supported table type " + hoodieBigQueryClient.getTableType()); + throw new InvalidTableException(hoodieBigQueryClient.getBasePath()); + default: + LOG.error("Unknown table type " + hoodieBigQueryClient.getTableType()); + throw new InvalidTableException(hoodieBigQueryClient.getBasePath()); + } + } + + public static void main(String[] args) { + // parse the params + BigQuerySyncConfig cfg = new BigQuerySyncConfig(); + JCommander cmd = new JCommander(cfg, null, args); + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration()); + new BigQuerySyncTool(Utils.configToProperties(cfg), new Configuration(), fs).syncHoodieTable(); + } + + @Override + public void syncHoodieTable() { + try { + switch (hoodieBigQueryClient.getTableType()) { + case COPY_ON_WRITE: + syncCoWTable(); + break; + case MERGE_ON_READ: + LOG.error("Not supported table type " + hoodieBigQueryClient.getTableType()); + throw new InvalidTableException(hoodieBigQueryClient.getBasePath()); + default: + LOG.error("Unknown table type " + hoodieBigQueryClient.getTableType()); + throw new InvalidTableException(hoodieBigQueryClient.getBasePath()); + } + } catch (RuntimeException re) { + throw new HoodieException("Got runtime exception when big query syncing " + cfg.tableName, re); + } finally { + hoodieBigQueryClient.close(); Review comment: Since the client is created on a different method, it's hard to use the try-with-resource block in this context. ########## File path: hudi-sync/hudi-bigquery-sync/src/main/java/org/apache/hudi/bigquery/BigQuerySyncTool.java ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.bigquery; + +import org.apache.hudi.bigquery.util.Utils; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.InvalidTableException; +import org.apache.hudi.sync.common.AbstractSyncTool; + +import com.beust.jcommander.JCommander; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.List; +import java.util.Properties; + +/** + * Tool to sync a hoodie table with a big query table. Either use it as an api + * BigQuerySyncTool.syncHoodieTable(BigQuerySyncConfig) or as a command line java -cp hoodie-hive.jar BigQuerySyncTool [args] + * <p> + * This utility will get the schema from the latest commit and will sync big query table schema Review comment: Resolved. ########## File path: hudi-sync/hudi-bigquery-sync/src/main/java/org/apache/hudi/bigquery/HoodieBigQueryClient.java ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.bigquery; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.CsvOptions; +import com.google.cloud.bigquery.ExternalTableDefinition; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.HivePartitioningOptions; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.ViewDefinition; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.schema.MessageType; + +import java.util.List; +import java.util.Map; + +public class HoodieBigQueryClient extends AbstractSyncHoodieClient { + private static final Logger LOG = LogManager.getLogger(HoodieBigQueryClient.class); + private transient BigQuery bigquery; + + public HoodieBigQueryClient(final BigQuerySyncConfig syncConfig, final FileSystem fs) { + super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata, + false, fs); + this.createBigQueryConnection(); + } + + private void createBigQueryConnection() { + if (bigquery == null) { + try { + // Initialize client that will be used to send requests. This client only needs to be created + // once, and can be reused for multiple requests. + bigquery = BigQueryOptions.getDefaultInstance().getService(); + LOG.info("Successfully established BigQuery connection."); + } catch (BigQueryException e) { + throw new HoodieException("Cannot create bigQuery connection ", e); + } + } + } + + @Override + public void createTable(final String tableName, final MessageType storageSchema, final String inputFormatClass, + final String outputFormatClass, final String serdeClass, + final Map<String, String> serdeProperties, final Map<String, String> tableProperties) { + // bigQuery create table arguments are different, so do nothing. + } + + public void createVersionsTable( + String projectId, String datasetName, String tableName, String sourceUri, String sourceUriPrefix, List<String> partitionFields) { + try { + ExternalTableDefinition customTable; + TableId tableId = TableId.of(projectId, datasetName, tableName); + + if (partitionFields != null) { Review comment: Resolved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
