This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new dd96129 [HUDI-2990] Sync to HMS when deleting partitions (#4291)
dd96129 is described below
commit dd961291914df656418c9a1cacd39eaf8a806809
Author: ForwardXu <[email protected]>
AuthorDate: Mon Dec 13 20:40:06 2021 +0800
[HUDI-2990] Sync to HMS when deleting partitions (#4291)
---
.../hudi/common/table/TableSchemaResolver.java | 19 +++++++
.../AlterHoodieTableDropPartitionCommand.scala | 24 ++++++--
.../java/org/apache/hudi/dla/HoodieDLAClient.java | 5 ++
.../java/org/apache/hudi/hive/HiveSyncTool.java | 64 ++++++++++++++--------
.../org/apache/hudi/hive/HoodieHiveClient.java | 33 +++++++++--
.../java/org/apache/hudi/hive/ddl/DDLExecutor.java | 8 +++
.../org/apache/hudi/hive/ddl/HMSDDLExecutor.java | 19 +++++++
.../apache/hudi/hive/ddl/HiveQueryDDLExecutor.java | 19 +++++++
.../org/apache/hudi/hive/ddl/JDBCExecutor.java | 8 +++
.../org/apache/hudi/hive/TestHiveSyncTool.java | 50 +++++++++++++++++
.../hudi/sync/common/AbstractSyncHoodieClient.java | 31 ++++++++++-
11 files changed, 244 insertions(+), 36 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 51e3e27..d1a4d96 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -414,6 +414,25 @@ public class TableSchemaResolver {
return latestSchema;
}
+
+ /**
+ * Get Last commit's Metadata.
+ */
+ public Option<HoodieCommitMetadata> getLatestCommitMetadata() {
+ try {
+ HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ if (timeline.lastInstant().isPresent()) {
+ HoodieInstant instant = timeline.lastInstant().get();
+ byte[] data = timeline.getInstantDetails(instant).get();
+ return Option.of(HoodieCommitMetadata.fromBytes(data,
HoodieCommitMetadata.class));
+ } else {
+ return Option.empty();
+ }
+ } catch (Exception e) {
+ throw new HoodieException("Failed to get commit metadata", e);
+ }
+ }
+
/**
* Read the parquet schema from a parquet File.
*/
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala
index a3dfdd6..1c295fb 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala
@@ -17,18 +17,19 @@
package org.apache.spark.sql.hudi.command
-import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
-
-import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
+import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
+import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
case class AlterHoodieTableDropPartitionCommand(
tableIdentifier: TableIdentifier,
@@ -67,7 +68,8 @@ extends RunnableCommand {
val allPartitionPaths = hoodieCatalogTable.getAllPartitionPaths
val enableHiveStylePartitioning =
isHiveStyledPartitioning(allPartitionPaths, table)
val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
- val partitionsToDelete = normalizedSpecs.map { spec =>
+ val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
+ val partitionsToDrop = normalizedSpecs.map { spec =>
hoodieCatalogTable.partitionFields.map{ partitionColumn =>
val encodedPartitionValue = if (enableEncodeUrl) {
PartitionPathEncodeUtils.escapePathName(spec(partitionColumn))
@@ -82,16 +84,26 @@ extends RunnableCommand {
}.mkString("/")
}.mkString(",")
+ val enableHive = isEnableHive(sparkSession)
withSparkConf(sparkSession, Map.empty) {
Map(
"path" -> hoodieCatalogTable.tableLocation,
TBL_NAME.key -> hoodieCatalogTable.tableName,
TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
OPERATION.key ->
DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
- PARTITIONS_TO_DELETE.key -> partitionsToDelete,
+ PARTITIONS_TO_DELETE.key -> partitionsToDrop,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
- PARTITIONPATH_FIELD.key ->
hoodieCatalogTable.partitionFields.mkString(",")
+ PARTITIONPATH_FIELD.key -> partitionFields,
+ HIVE_SYNC_ENABLED.key -> enableHive.toString,
+ META_SYNC_ENABLED.key -> enableHive.toString,
+ HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
+ HIVE_USE_JDBC.key -> "false",
+ HIVE_DATABASE.key ->
hoodieCatalogTable.table.identifier.database.getOrElse("default"),
+ HIVE_TABLE.key -> hoodieCatalogTable.table.identifier.table,
+ HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
+ HIVE_PARTITION_FIELDS.key -> partitionFields,
+ HIVE_PARTITION_EXTRACTOR_CLASS.key ->
classOf[MultiPartKeysValueExtractor].getCanonicalName
)
}
}
diff --git
a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
index 20f94f0..77d7362 100644
---
a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
+++
b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
@@ -287,6 +287,11 @@ public class HoodieDLAClient extends
AbstractSyncHoodieClient {
}
}
+ @Override
+ public void dropPartitionsToTable(String tableName, List<String>
partitionsToDrop) {
+ throw new UnsupportedOperationException("Not support dropPartitionsToTable
yet.");
+ }
+
public Map<List<String>, String> scanTablePartitions(String tableName) {
String sql = constructShowPartitionSQL(tableName);
Statement stmt = null;
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 3bbaee1..f07ab88 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -28,7 +28,6 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hive.util.ConfigUtils;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils;
-
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import
org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.sync.common.AbstractSyncTool;
@@ -166,20 +165,28 @@ public class HiveSyncTool extends AbstractSyncTool {
// Check if the necessary table exists
boolean tableExists = hoodieHiveClient.doesTableExist(tableName);
- // Get the parquet schema for this table looking at the latest commit
- MessageType schema = hoodieHiveClient.getDataSchema();
-
- // Currently HoodieBootstrapRelation does support reading bootstrap MOR rt
table,
- // so we disable the syncAsSparkDataSourceTable here to avoid read such
kind table
- // by the data source way (which will use the HoodieBootstrapRelation).
- // TODO after we support bootstrap MOR rt table in
HoodieBootstrapRelation[HUDI-2071], we can remove this logical.
- if (hoodieHiveClient.isBootstrap()
- && hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ
- && !readAsOptimized) {
- cfg.syncAsSparkDataSourceTable = false;
+ // check if isDropPartition
+ boolean isDropPartition = hoodieHiveClient.isDropPartition();
+
+ // check if schemaChanged
+ boolean schemaChanged = false;
+
+ if (!isDropPartition) {
+ // Get the parquet schema for this table looking at the latest commit
+ MessageType schema = hoodieHiveClient.getDataSchema();
+
+ // Currently HoodieBootstrapRelation does support reading bootstrap MOR
rt table,
+ // so we disable the syncAsSparkDataSourceTable here to avoid read such
kind table
+ // by the data source way (which will use the HoodieBootstrapRelation).
+ // TODO after we support bootstrap MOR rt table in
HoodieBootstrapRelation[HUDI-2071], we can remove this logical.
+ if (hoodieHiveClient.isBootstrap()
+ && hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ
+ && !readAsOptimized) {
+ cfg.syncAsSparkDataSourceTable = false;
+ }
+ // Sync schema if needed
+ schemaChanged = syncSchema(tableName, tableExists,
useRealtimeInputFormat, readAsOptimized, schema);
}
- // Sync schema if needed
- boolean schemaChanged = syncSchema(tableName, tableExists,
useRealtimeInputFormat, readAsOptimized, schema);
LOG.info("Schema sync complete. Syncing partitions for " + tableName);
// Get the last time we successfully synced partitions
@@ -192,7 +199,7 @@ public class HiveSyncTool extends AbstractSyncTool {
LOG.info("Storage partitions scan complete. Found " +
writtenPartitionsSince.size());
// Sync the partitions if needed
- boolean partitionsChanged = syncPartitions(tableName,
writtenPartitionsSince);
+ boolean partitionsChanged = syncPartitions(tableName,
writtenPartitionsSince, isDropPartition);
boolean meetSyncConditions = schemaChanged || partitionsChanged;
if (!cfg.isConditionalSync || meetSyncConditions) {
hoodieHiveClient.updateLastCommitTimeSynced(tableName);
@@ -331,19 +338,32 @@ public class HiveSyncTool extends AbstractSyncTool {
* Syncs the list of storage partitions passed in (checks if the partition
is in hive, if not adds it or if the
* partition path does not match, it updates the partition path).
*/
- private boolean syncPartitions(String tableName, List<String>
writtenPartitionsSince) {
+ private boolean syncPartitions(String tableName, List<String>
writtenPartitionsSince, boolean isDropPartition) {
boolean partitionsChanged;
try {
List<Partition> hivePartitions =
hoodieHiveClient.scanTablePartitions(tableName);
List<PartitionEvent> partitionEvents =
- hoodieHiveClient.getPartitionEvents(hivePartitions,
writtenPartitionsSince);
+ hoodieHiveClient.getPartitionEvents(hivePartitions,
writtenPartitionsSince, isDropPartition);
+
List<String> newPartitions = filterPartitions(partitionEvents,
PartitionEventType.ADD);
- LOG.info("New Partitions " + newPartitions);
- hoodieHiveClient.addPartitionsToTable(tableName, newPartitions);
+ if (!newPartitions.isEmpty()) {
+ LOG.info("New Partitions " + newPartitions);
+ hoodieHiveClient.addPartitionsToTable(tableName, newPartitions);
+ }
+
List<String> updatePartitions = filterPartitions(partitionEvents,
PartitionEventType.UPDATE);
- LOG.info("Changed Partitions " + updatePartitions);
- hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions);
- partitionsChanged = !updatePartitions.isEmpty() ||
!newPartitions.isEmpty();
+ if (!updatePartitions.isEmpty()) {
+ LOG.info("Changed Partitions " + updatePartitions);
+ hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions);
+ }
+
+ List<String> dropPartitions = filterPartitions(partitionEvents,
PartitionEventType.DROP);
+ if (!dropPartitions.isEmpty()) {
+ LOG.info("Drop Partitions " + dropPartitions);
+ hoodieHiveClient.dropPartitionsToTable(tableName, dropPartitions);
+ }
+
+ partitionsChanged = !updatePartitions.isEmpty() ||
!newPartitions.isEmpty() || !dropPartitions.isEmpty();
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to sync partitions for table "
+ tableName, e);
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 265ab75..287de57 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -123,6 +123,14 @@ public class HoodieHiveClient extends
AbstractSyncHoodieClient {
}
/**
+ * Partition path has changed - drop the following partitions.
+ */
+ @Override
+ public void dropPartitionsToTable(String tableName, List<String>
partitionsToDrop) {
+ ddlExecutor.dropPartitionsToTable(tableName, partitionsToDrop);
+ }
+
+ /**
* Update the table properties to the table.
*/
@Override
@@ -147,6 +155,14 @@ public class HoodieHiveClient extends
AbstractSyncHoodieClient {
* Generate a list of PartitionEvent based on the changes required.
*/
List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions,
List<String> partitionStoragePartitions) {
+ return getPartitionEvents(tablePartitions, partitionStoragePartitions,
false);
+ }
+
+ /**
+ * Iterate over the storage partitions and find if there are any new
partitions that need to be added or updated.
+ * Generate a list of PartitionEvent based on the changes required.
+ */
+ List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions,
List<String> partitionStoragePartitions, boolean isDropPartition) {
Map<String, String> paths = new HashMap<>();
for (Partition tablePartition : tablePartitions) {
List<String> hivePartitionValues = tablePartition.getValues();
@@ -161,12 +177,17 @@ public class HoodieHiveClient extends
AbstractSyncHoodieClient {
String fullStoragePartitionPath =
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues =
partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
- if (!storagePartitionValues.isEmpty()) {
- String storageValue = String.join(", ", storagePartitionValues);
- if (!paths.containsKey(storageValue)) {
- events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
- } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
- events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
+
+ if (isDropPartition) {
+ events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
+ } else {
+ if (!storagePartitionValues.isEmpty()) {
+ String storageValue = String.join(", ", storagePartitionValues);
+ if (!paths.containsKey(storageValue)) {
+ events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
+ } else if
(!paths.get(storageValue).equals(fullStoragePartitionPath)) {
+
events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
+ }
}
}
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java
index 0e1e223..dc37d92 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java
@@ -81,5 +81,13 @@ public interface DDLExecutor {
*/
public void updatePartitionsToTable(String tableName, List<String>
changedPartitions);
+ /**
+ * Drop partitions for a given table.
+ *
+ * @param tableName
+ * @param partitionsToDrop
+ */
+ public void dropPartitionsToTable(String tableName, List<String>
partitionsToDrop);
+
public void close();
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
index 37aa54a..d3efebe 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
@@ -227,6 +227,25 @@ public class HMSDDLExecutor implements DDLExecutor {
}
@Override
+ public void dropPartitionsToTable(String tableName, List<String>
partitionsToDrop) {
+ if (partitionsToDrop.isEmpty()) {
+ LOG.info("No partitions to drop for " + tableName);
+ return;
+ }
+
+ LOG.info("Drop partitions " + partitionsToDrop.size() + " on " +
tableName);
+ try {
+ for (String dropPartition : partitionsToDrop) {
+ client.dropPartition(syncConfig.databaseName, tableName,
dropPartition, false);
+ LOG.info("Drop partition " + dropPartition + " on " + tableName);
+ }
+ } catch (TException e) {
+ LOG.error(syncConfig.databaseName + "." + tableName + " drop partition
failed", e);
+ throw new HoodieHiveSyncException(syncConfig.databaseName + "." +
tableName + " drop partition failed", e);
+ }
+ }
+
+ @Override
public void close() {
if (client != null) {
Hive.closeCurrent();
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
index e2635ee..7161194 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
@@ -127,6 +127,25 @@ public class HiveQueryDDLExecutor extends
QueryBasedDDLExecutor {
}
@Override
+ public void dropPartitionsToTable(String tableName, List<String>
partitionsToDrop) {
+ if (partitionsToDrop.isEmpty()) {
+ LOG.info("No partitions to drop for " + tableName);
+ return;
+ }
+
+ LOG.info("Drop partitions " + partitionsToDrop.size() + " on " +
tableName);
+ try {
+ for (String dropPartition : partitionsToDrop) {
+ metaStoreClient.dropPartition(config.databaseName, tableName,
dropPartition, false);
+ LOG.info("Drop partition " + dropPartition + " on " + tableName);
+ }
+ } catch (Exception e) {
+ LOG.error(config.databaseName + "." + tableName + " drop partition
failed", e);
+ throw new HoodieHiveSyncException(config.databaseName + "." + tableName
+ " drop partition failed", e);
+ }
+ }
+
+ @Override
public void close() {
if (metaStoreClient != null) {
Hive.closeCurrent();
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
index 1603191..493d4ee 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
@@ -32,6 +32,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -142,6 +143,13 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
}
@Override
+ public void dropPartitionsToTable(String tableName, List<String>
partitionsToDrop) {
+ partitionsToDrop.stream()
+ .map(partition -> String.format("ALTER TABLE `%s` DROP PARTITION
(%s)", tableName, partition))
+ .forEach(this::runSQL);
+ }
+
+ @Override
public void close() {
try {
if (connection != null) {
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index d36727a..ef98641 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -739,6 +739,56 @@ public class TestHiveSyncTool {
@ParameterizedTest
@MethodSource("syncMode")
+ public void testDropPartitionKeySync(String syncMode) throws Exception {
+ hiveSyncConfig.syncMode = syncMode;
+ HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
+ String instantTime = "100";
+ HiveTestUtil.createCOWTable(instantTime, 1, true);
+ HoodieHiveClient hiveClient =
+ new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(),
fileSystem);
+ assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
+ "Table " + hiveSyncConfig.tableName + " should not exist initially");
+ // Lets do the sync
+ HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig,
HiveTestUtil.getHiveConf(), fileSystem);
+ tool.syncHoodieTable();
+ // we need renew the hiveclient after tool.syncHoodieTable(), because it
will close hive
+ // session, then lead to connection retry, we can see there is a exception
at log.
+ hiveClient =
+ new HoodieHiveClient(HiveTestUtil.hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+
assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
+ "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist
after sync completes");
+
assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
+ hiveClient.getDataSchema().getColumns().size() + 1,
+ "Hive Schema should match the table schema + partition field");
+ assertEquals(1,
hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
+ "Table partitions should match the number of partitions we wrote");
+ assertEquals(instantTime,
hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
+ "The last commit that was synced should be updated in the
TBLPROPERTIES");
+
+ // Adding of new partitions
+ List<String> newPartition = Arrays.asList("2050/01/01");
+ hiveClient.addPartitionsToTable(hiveSyncConfig.tableName, Arrays.asList());
+ assertEquals(1,
hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
+ "No new partition should be added");
+ hiveClient.addPartitionsToTable(hiveSyncConfig.tableName, newPartition);
+ assertEquals(2,
hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
+ "New partition should be added");
+
+ tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(),
fileSystem);
+ tool.syncHoodieTable();
+
+ // Drop 1 partition.
+ ddlExecutor.runSQL("ALTER TABLE `" + hiveSyncConfig.tableName
+ + "` DROP PARTITION (`datestr`='2050-01-01')");
+
+ hiveClient = new HoodieHiveClient(hiveSyncConfig,
HiveTestUtil.getHiveConf(), fileSystem);
+ List<Partition> hivePartitions =
hiveClient.scanTablePartitions(hiveSyncConfig.tableName);
+ assertEquals(1, hivePartitions.size(),
+ "Table should have 1 partition because of the drop 1 partition");
+ }
+
+ @ParameterizedTest
+ @MethodSource("syncMode")
public void testNonPartitionedSync(String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
index ce4720a..0a277be 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
@@ -20,16 +20,18 @@ package org.apache.hudi.sync.common;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
@@ -98,6 +100,8 @@ public abstract class AbstractSyncHoodieClient {
public abstract void updatePartitionsToTable(String tableName, List<String>
changedPartitions);
+ public abstract void dropPartitionsToTable(String tableName, List<String>
partitionsToDrop);
+
public void updateTableProperties(String tableName, Map<String, String>
tableProperties) {}
public abstract Map<String, String> getTableSchema(String tableName);
@@ -155,6 +159,25 @@ public abstract class AbstractSyncHoodieClient {
}
}
+ public boolean isDropPartition() {
+ try {
+ Option<HoodieCommitMetadata> hoodieCommitMetadata;
+ if (withOperationField) {
+ hoodieCommitMetadata = new TableSchemaResolver(metaClient,
true).getLatestCommitMetadata();
+ } else {
+ hoodieCommitMetadata = new
TableSchemaResolver(metaClient).getLatestCommitMetadata();
+ }
+
+ if (hoodieCommitMetadata.isPresent()
+ &&
hoodieCommitMetadata.get().getOperationType().equals(WriteOperationType.DELETE_PARTITION))
{
+ return true;
+ }
+ } catch (Exception e) {
+ throw new HoodieSyncException("Failed to read data schema", e);
+ }
+ return false;
+ }
+
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public List<String> getPartitionsWrittenToSince(Option<String>
lastCommitTimeSynced) {
if (!lastCommitTimeSynced.isPresent()) {
@@ -224,7 +247,7 @@ public abstract class AbstractSyncHoodieClient {
public static class PartitionEvent {
public enum PartitionEventType {
- ADD, UPDATE
+ ADD, UPDATE, DROP
}
public PartitionEventType eventType;
@@ -242,5 +265,9 @@ public abstract class AbstractSyncHoodieClient {
public static PartitionEvent newPartitionUpdateEvent(String
storagePartition) {
return new PartitionEvent(PartitionEventType.UPDATE, storagePartition);
}
+
+ public static PartitionEvent newPartitionDropEvent(String
storagePartition) {
+ return new PartitionEvent(PartitionEventType.DROP, storagePartition);
+ }
}
}