This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f215e2  [HUDI-2057]  CTAS Generate An External Table When Create 
Managed Table (#3146)
4f215e2 is described below

commit 4f215e2938f78e13bea940013855cb14a6724601
Author: pengzhiwei <[email protected]>
AuthorDate: Sat Jul 3 15:55:36 2021 +0800

    [HUDI-2057]  CTAS Generate An External Table When Create Managed Table 
(#3146)
---
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  6 ++++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  2 +-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |  2 ++
 .../command/CreateHoodieTableAsSelectCommand.scala |  4 +++
 .../functional/HoodieSparkSqlWriterSuite.scala     |  4 ++-
 .../java/org/apache/hudi/hive/HiveSyncConfig.java  |  5 +++
 .../org/apache/hudi/hive/util/HiveSchemaUtil.java  |  7 ++++-
 .../org/apache/hudi/hive/TestHiveSyncTool.java     | 36 ++++++++++++++++++++++
 8 files changed, 63 insertions(+), 3 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 3e6f7e4..8a68d3e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -456,6 +456,12 @@ object DataSourceWriteOptions {
     .defaultValue("true")
     .withDocumentation("")
 
+  // Create table as managed table
+  val HIVE_CREATE_MANAGED_TABLE: ConfigProperty[Boolean] = ConfigProperty
+    .key("hoodie.datasource.hive_sync.create_managed_table")
+    .defaultValue(false)
+    .withDocumentation("Whether to sync the table as managed table.")
+
   // Async Compaction - Enabled by default for MOR
   val ASYNC_COMPACT_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.compaction.async.enable")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index fd70a42..cecb9de 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -439,8 +439,8 @@ object HoodieSparkSqlWriter {
       serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
 
       hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProp)
-
     }
+    hiveSyncConfig.createManagedTable = 
hoodieConfig.getBoolean(HIVE_CREATE_MANAGED_TABLE)
     hiveSyncConfig
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index d3239e0..586e916 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -73,6 +73,8 @@ object HoodieWriterUtils {
       HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.key -> 
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.defaultValue,
       HIVE_STYLE_PARTITIONING_OPT_KEY.key -> 
HIVE_STYLE_PARTITIONING_OPT_KEY.defaultValue,
       HIVE_USE_JDBC_OPT_KEY.key -> HIVE_USE_JDBC_OPT_KEY.defaultValue,
+      HIVE_CREATE_MANAGED_TABLE.key() -> 
HIVE_CREATE_MANAGED_TABLE.defaultValue.toString,
+      HIVE_SYNC_AS_DATA_SOURCE_TABLE.key() -> 
HIVE_SYNC_AS_DATA_SOURCE_TABLE.defaultValue(),
       ASYNC_COMPACT_ENABLE_OPT_KEY.key -> 
ASYNC_COMPACT_ENABLE_OPT_KEY.defaultValue,
       ENABLE_ROW_WRITER_OPT_KEY.key -> ENABLE_ROW_WRITER_OPT_KEY.defaultValue
     ) ++ DataSourceOptionsHelper.translateConfigurations(parameters)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
index bdddb1c..64eff9a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.hudi.DataSourceWriteOptions
 import org.apache.spark.sql.{Row, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -69,6 +70,9 @@ case class CreateHoodieTableAsSelectCommand(
 
     // Execute the insert query
     try {
+      // Set if sync as a managed table.
+      
sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key(),
+        (table.tableType == CatalogTableType.MANAGED).toString)
       val success = InsertIntoHoodieTableCommand.run(sparkSession, 
tableWithSchema, reOrderedQuery, Map.empty,
         mode == SaveMode.Overwrite, refreshTable = false)
       if (success) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index a9691c4..19d2a08 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -542,7 +542,8 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
       "path" -> basePath,
       DataSourceWriteOptions.TABLE_NAME_OPT_KEY.key -> "test_hoodie",
       DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY.key -> "partition",
-      DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX.key -> "true"
+      DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX.key -> "true",
+      DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key -> "true"
     )
     val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params)
     val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
@@ -559,6 +560,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
       new Path(basePath), newHoodieConfig).asInstanceOf[HiveSyncConfig]
 
     assertTrue(hiveSyncConfig.skipROSuffix)
+    assertTrue(hiveSyncConfig.createManagedTable)
     assertResult("spark.sql.sources.provider=hudi\n" +
       "spark.sql.sources.schema.partCol.0=partition\n" +
       "spark.sql.sources.schema.numParts=1\n" +
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index c085aa0..3d4cc5b 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -104,6 +104,9 @@ public class HiveSyncConfig implements Serializable {
   @Parameter(names = {"--decode-partition"}, description = "Decode the 
partition value if the partition has encoded during writing")
   public Boolean decodePartition = false;
 
+  @Parameter(names = {"--managed-table"}, description = "Create a managed 
table")
+  public Boolean createManagedTable = false;
+
   // enhance the similar function in child class
   public static HiveSyncConfig copy(HiveSyncConfig cfg) {
     HiveSyncConfig newConfig = new HiveSyncConfig();
@@ -123,6 +126,7 @@ public class HiveSyncConfig implements Serializable {
     newConfig.decodePartition = cfg.decodePartition;
     newConfig.tableProperties = cfg.tableProperties;
     newConfig.serdeProperties = cfg.serdeProperties;
+    newConfig.createManagedTable = cfg.createManagedTable;
     return newConfig;
   }
 
@@ -151,6 +155,7 @@ public class HiveSyncConfig implements Serializable {
       + ", help=" + help
       + ", supportTimestamp=" + supportTimestamp
       + ", decodePartition=" + decodePartition
+      + ", createManagedTable=" + createManagedTable
       + '}';
   }
 }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
index 6eea716..7af54bb 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
@@ -413,7 +413,12 @@ public class HiveSchemaUtil {
     }
 
     String partitionsStr = String.join(",", partitionFields);
-    StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE  IF NOT EXISTS 
");
+    StringBuilder sb = new StringBuilder();
+    if (config.createManagedTable) {
+      sb.append("CREATE TABLE IF NOT EXISTS ");
+    } else {
+      sb.append("CREATE EXTERNAL TABLE IF NOT EXISTS ");
+    }
     
sb.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName).append(HIVE_ESCAPE_CHARACTER)
             
.append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER);
     sb.append("( ").append(columns).append(")");
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 09bf756..f0e171b 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -66,6 +66,10 @@ public class TestHiveSyncTool {
     return Arrays.asList(new Object[][] {{true, true}, {true, false}, {false, 
true}, {false, false}});
   }
 
+  private static Iterable<Object[]> 
useJdbcAndSchemaFromCommitMetadataAndManagedTable() {
+    return Arrays.asList(new Object[][] {{true, true, true}, {true, false, 
false}, {false, true, true}, {false, false, false}});
+  }
+
   @BeforeEach
   public void setUp() throws Exception {
     HiveTestUtil.setUp();
@@ -269,6 +273,38 @@ public class TestHiveSyncTool {
       String ddl = String.join("\n", results);
       assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'"));
       assertTrue(ddl.contains("'hoodie.datasource.query.type'='" + 
expectQueryType + "'"));
+      assertTrue(ddl.toLowerCase().contains("create external table"));
+    }
+  }
+
+  @ParameterizedTest
+  @MethodSource({"useJdbcAndSchemaFromCommitMetadataAndManagedTable"})
+  public void testSyncManagedTable(boolean useJdbc,
+                                   boolean useSchemaFromCommitMetadata,
+                                   boolean isManagedTable) throws Exception {
+    HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig;
+
+    hiveSyncConfig.useJdbc = useJdbc;
+    hiveSyncConfig.createManagedTable = isManagedTable;
+    String instantTime = "100";
+    HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
+
+    HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    tool.syncHoodieTable();
+
+    SessionState.start(HiveTestUtil.getHiveConf());
+    Driver hiveDriver = new 
org.apache.hadoop.hive.ql.Driver(HiveTestUtil.getHiveConf());
+    String dbTableName = hiveSyncConfig.databaseName + "." + 
hiveSyncConfig.tableName;
+    hiveDriver.run("SHOW TBLPROPERTIES " + dbTableName);
+
+    List<String> results = new ArrayList<>();
+    hiveDriver.run("SHOW CREATE TABLE " + dbTableName);
+    hiveDriver.getResults(results);
+    String ddl = String.join("\n", results).toLowerCase();
+    if (isManagedTable) {
+      assertTrue(ddl.contains("create table"));
+    } else {
+      assertTrue(ddl.contains("create external table"));
     }
   }
 

Reply via email to