vingov commented on a change in pull request #5125:
URL: https://github.com/apache/hudi/pull/5125#discussion_r839193430



##########
File path: 
hudi-sync/hudi-bigquery-sync/src/main/java/org/apache/hudi/bigquery/BigQuerySyncTool.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.bigquery;
+
+import org.apache.hudi.bigquery.util.Utils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.sync.common.AbstractSyncTool;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Tool to sync a hoodie table with a big query table. Either use it as an api
+ * BigQuerySyncTool.syncHoodieTable(BigQuerySyncConfig) or as a command line 
java -cp hoodie-hive.jar BigQuerySyncTool [args]
+ * <p>
+ * This utility will get the schema from the latest commit and will sync big 
query table schema
+ */
+public class BigQuerySyncTool extends AbstractSyncTool {
+  private static final Logger LOG = 
LogManager.getLogger(BigQuerySyncTool.class);
+  public final BigQuerySyncConfig cfg;
+  public final HoodieBigQueryClient hoodieBigQueryClient;
+  public String projectId;
+  public String datasetName;
+  public String manifestTableName;
+  public String versionsTableName;
+  public String snapshotViewName;
+  public String sourceUri;
+  public String sourceUriPrefix;
+  public List<String> partitionFields;
+
+  private BigQuerySyncTool(Properties properties, Configuration conf, 
FileSystem fs) {
+    super(new TypedProperties(properties), conf, fs);
+    hoodieBigQueryClient = new 
HoodieBigQueryClient(Utils.propertiesToConfig(properties), fs);
+    cfg = Utils.propertiesToConfig(properties);
+    switch (hoodieBigQueryClient.getTableType()) {
+      case COPY_ON_WRITE:
+        projectId = cfg.projectId;
+        datasetName = cfg.datasetName;
+        manifestTableName = cfg.tableName + "_manifest";
+        versionsTableName = cfg.tableName + "_versions";
+        snapshotViewName = cfg.tableName;
+        sourceUri = cfg.sourceUri;
+        sourceUriPrefix = cfg.sourceUriPrefix;
+        partitionFields = cfg.partitionFields;
+        break;
+      case MERGE_ON_READ:
+        LOG.error("Not supported table type " + 
hoodieBigQueryClient.getTableType());
+        throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+      default:
+        LOG.error("Unknown table type " + hoodieBigQueryClient.getTableType());
+        throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+    }
+  }
+
+  public static void main(String[] args) {
+    // parse the params
+    BigQuerySyncConfig cfg = new BigQuerySyncConfig();
+    JCommander cmd = new JCommander(cfg, null, args);
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+    FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+    new BigQuerySyncTool(Utils.configToProperties(cfg), new Configuration(), 
fs).syncHoodieTable();
+  }
+
+  @Override
+  public void syncHoodieTable() {
+    try {
+      switch (hoodieBigQueryClient.getTableType()) {
+        case COPY_ON_WRITE:
+          syncCoWTable();
+          break;
+        case MERGE_ON_READ:
+          LOG.error("Not supported table type " + 
hoodieBigQueryClient.getTableType());
+          throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+        default:
+          LOG.error("Unknown table type " + 
hoodieBigQueryClient.getTableType());
+          throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+      }
+    } catch (RuntimeException re) {
+      throw new HoodieException("Got runtime exception when big query syncing 
" + cfg.tableName, re);
+    } finally {
+      hoodieBigQueryClient.close();

Review comment:
       Since the client is created on a different method, it's hard to use the 
try-with-resource block in this context.

##########
File path: 
hudi-sync/hudi-bigquery-sync/src/main/java/org/apache/hudi/bigquery/BigQuerySyncTool.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.bigquery;
+
+import org.apache.hudi.bigquery.util.Utils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.sync.common.AbstractSyncTool;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Tool to sync a hoodie table with a big query table. Either use it as an api
+ * BigQuerySyncTool.syncHoodieTable(BigQuerySyncConfig) or as a command line 
java -cp hoodie-hive.jar BigQuerySyncTool [args]
+ * <p>
+ * This utility will get the schema from the latest commit and will sync big 
query table schema

Review comment:
       Resolved.

##########
File path: 
hudi-sync/hudi-bigquery-sync/src/main/java/org/apache/hudi/bigquery/HoodieBigQueryClient.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.bigquery;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryException;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.CsvOptions;
+import com.google.cloud.bigquery.ExternalTableDefinition;
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.HivePartitioningOptions;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableInfo;
+import com.google.cloud.bigquery.ViewDefinition;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.List;
+import java.util.Map;
+
+public class HoodieBigQueryClient extends AbstractSyncHoodieClient {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBigQueryClient.class);
+  private transient BigQuery bigquery;
+
+  public HoodieBigQueryClient(final BigQuerySyncConfig syncConfig, final 
FileSystem fs) {
+    super(syncConfig.basePath, syncConfig.assumeDatePartitioning, 
syncConfig.useFileListingFromMetadata,
+        false, fs);
+    this.createBigQueryConnection();
+  }
+
+  private void createBigQueryConnection() {
+    if (bigquery == null) {
+      try {
+        // Initialize client that will be used to send requests. This client 
only needs to be created
+        // once, and can be reused for multiple requests.
+        bigquery = BigQueryOptions.getDefaultInstance().getService();
+        LOG.info("Successfully established BigQuery connection.");
+      } catch (BigQueryException e) {
+        throw new HoodieException("Cannot create bigQuery connection ", e);
+      }
+    }
+  }
+
+  @Override
+  public void createTable(final String tableName, final MessageType 
storageSchema, final String inputFormatClass,
+                          final String outputFormatClass, final String 
serdeClass,
+                          final Map<String, String> serdeProperties, final 
Map<String, String> tableProperties) {
+    // bigQuery create table arguments are different, so do nothing.
+  }
+
+  public void createVersionsTable(
+      String projectId, String datasetName, String tableName, String 
sourceUri, String sourceUriPrefix, List<String> partitionFields) {
+    try {
+      ExternalTableDefinition customTable;
+      TableId tableId = TableId.of(projectId, datasetName, tableName);
+
+      if (partitionFields != null) {

Review comment:
       Resolved.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to