This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4f215e2 [HUDI-2057] CTAS Generate An External Table When Create
Managed Table (#3146)
4f215e2 is described below
commit 4f215e2938f78e13bea940013855cb14a6724601
Author: pengzhiwei <[email protected]>
AuthorDate: Sat Jul 3 15:55:36 2021 +0800
[HUDI-2057] CTAS Generate An External Table When Create Managed Table
(#3146)
---
.../scala/org/apache/hudi/DataSourceOptions.scala | 6 ++++
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 2 +-
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 2 ++
.../command/CreateHoodieTableAsSelectCommand.scala | 4 +++
.../functional/HoodieSparkSqlWriterSuite.scala | 4 ++-
.../java/org/apache/hudi/hive/HiveSyncConfig.java | 5 +++
.../org/apache/hudi/hive/util/HiveSchemaUtil.java | 7 ++++-
.../org/apache/hudi/hive/TestHiveSyncTool.java | 36 ++++++++++++++++++++++
8 files changed, 63 insertions(+), 3 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 3e6f7e4..8a68d3e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -456,6 +456,12 @@ object DataSourceWriteOptions {
.defaultValue("true")
.withDocumentation("")
+ // Create table as managed table
+ val HIVE_CREATE_MANAGED_TABLE: ConfigProperty[Boolean] = ConfigProperty
+ .key("hoodie.datasource.hive_sync.create_managed_table")
+ .defaultValue(false)
+ .withDocumentation("Whether to sync the table as managed table.")
+
// Async Compaction - Enabled by default for MOR
val ASYNC_COMPACT_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.compaction.async.enable")
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index fd70a42..cecb9de 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -439,8 +439,8 @@ object HoodieSparkSqlWriter {
serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RT_KEY,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProp)
-
}
+ hiveSyncConfig.createManagedTable =
hoodieConfig.getBoolean(HIVE_CREATE_MANAGED_TABLE)
hiveSyncConfig
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index d3239e0..586e916 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -73,6 +73,8 @@ object HoodieWriterUtils {
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.key ->
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.defaultValue,
HIVE_STYLE_PARTITIONING_OPT_KEY.key ->
HIVE_STYLE_PARTITIONING_OPT_KEY.defaultValue,
HIVE_USE_JDBC_OPT_KEY.key -> HIVE_USE_JDBC_OPT_KEY.defaultValue,
+ HIVE_CREATE_MANAGED_TABLE.key() ->
HIVE_CREATE_MANAGED_TABLE.defaultValue.toString,
+ HIVE_SYNC_AS_DATA_SOURCE_TABLE.key() ->
HIVE_SYNC_AS_DATA_SOURCE_TABLE.defaultValue(),
ASYNC_COMPACT_ENABLE_OPT_KEY.key ->
ASYNC_COMPACT_ENABLE_OPT_KEY.defaultValue,
ENABLE_ROW_WRITER_OPT_KEY.key -> ENABLE_ROW_WRITER_OPT_KEY.defaultValue
) ++ DataSourceOptionsHelper.translateConfigurations(parameters)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
index bdddb1c..64eff9a 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.apache.hudi.DataSourceWriteOptions
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -69,6 +70,9 @@ case class CreateHoodieTableAsSelectCommand(
// Execute the insert query
try {
+ // Set if sync as a managed table.
+
sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key(),
+ (table.tableType == CatalogTableType.MANAGED).toString)
val success = InsertIntoHoodieTableCommand.run(sparkSession,
tableWithSchema, reOrderedQuery, Map.empty,
mode == SaveMode.Overwrite, refreshTable = false)
if (success) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index a9691c4..19d2a08 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -542,7 +542,8 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
"path" -> basePath,
DataSourceWriteOptions.TABLE_NAME_OPT_KEY.key -> "test_hoodie",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY.key -> "partition",
- DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX.key -> "true"
+ DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX.key -> "true",
+ DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key -> "true"
)
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params)
val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
@@ -559,6 +560,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
new Path(basePath), newHoodieConfig).asInstanceOf[HiveSyncConfig]
assertTrue(hiveSyncConfig.skipROSuffix)
+ assertTrue(hiveSyncConfig.createManagedTable)
assertResult("spark.sql.sources.provider=hudi\n" +
"spark.sql.sources.schema.partCol.0=partition\n" +
"spark.sql.sources.schema.numParts=1\n" +
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index c085aa0..3d4cc5b 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -104,6 +104,9 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--decode-partition"}, description = "Decode the
partition value if the partition has encoded during writing")
public Boolean decodePartition = false;
+ @Parameter(names = {"--managed-table"}, description = "Create a managed
table")
+ public Boolean createManagedTable = false;
+
// enhance the similar function in child class
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig();
@@ -123,6 +126,7 @@ public class HiveSyncConfig implements Serializable {
newConfig.decodePartition = cfg.decodePartition;
newConfig.tableProperties = cfg.tableProperties;
newConfig.serdeProperties = cfg.serdeProperties;
+ newConfig.createManagedTable = cfg.createManagedTable;
return newConfig;
}
@@ -151,6 +155,7 @@ public class HiveSyncConfig implements Serializable {
+ ", help=" + help
+ ", supportTimestamp=" + supportTimestamp
+ ", decodePartition=" + decodePartition
+ + ", createManagedTable=" + createManagedTable
+ '}';
}
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
index 6eea716..7af54bb 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
@@ -413,7 +413,12 @@ public class HiveSchemaUtil {
}
String partitionsStr = String.join(",", partitionFields);
- StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS
");
+ StringBuilder sb = new StringBuilder();
+ if (config.createManagedTable) {
+ sb.append("CREATE TABLE IF NOT EXISTS ");
+ } else {
+ sb.append("CREATE EXTERNAL TABLE IF NOT EXISTS ");
+ }
sb.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName).append(HIVE_ESCAPE_CHARACTER)
.append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER);
sb.append("( ").append(columns).append(")");
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 09bf756..f0e171b 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -66,6 +66,10 @@ public class TestHiveSyncTool {
return Arrays.asList(new Object[][] {{true, true}, {true, false}, {false,
true}, {false, false}});
}
+ private static Iterable<Object[]>
useJdbcAndSchemaFromCommitMetadataAndManagedTable() {
+ return Arrays.asList(new Object[][] {{true, true, true}, {true, false,
false}, {false, true, true}, {false, false, false}});
+ }
+
@BeforeEach
public void setUp() throws Exception {
HiveTestUtil.setUp();
@@ -269,6 +273,38 @@ public class TestHiveSyncTool {
String ddl = String.join("\n", results);
assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'"));
assertTrue(ddl.contains("'hoodie.datasource.query.type'='" +
expectQueryType + "'"));
+ assertTrue(ddl.toLowerCase().contains("create external table"));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource({"useJdbcAndSchemaFromCommitMetadataAndManagedTable"})
+ public void testSyncManagedTable(boolean useJdbc,
+ boolean useSchemaFromCommitMetadata,
+ boolean isManagedTable) throws Exception {
+ HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig;
+
+ hiveSyncConfig.useJdbc = useJdbc;
+ hiveSyncConfig.createManagedTable = isManagedTable;
+ String instantTime = "100";
+ HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
+
+ HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+ tool.syncHoodieTable();
+
+ SessionState.start(HiveTestUtil.getHiveConf());
+ Driver hiveDriver = new
org.apache.hadoop.hive.ql.Driver(HiveTestUtil.getHiveConf());
+ String dbTableName = hiveSyncConfig.databaseName + "." +
hiveSyncConfig.tableName;
+ hiveDriver.run("SHOW TBLPROPERTIES " + dbTableName);
+
+ List<String> results = new ArrayList<>();
+ hiveDriver.run("SHOW CREATE TABLE " + dbTableName);
+ hiveDriver.getResults(results);
+ String ddl = String.join("\n", results).toLowerCase();
+ if (isManagedTable) {
+ assertTrue(ddl.contains("create table"));
+ } else {
+ assertTrue(ddl.contains("create external table"));
}
}