This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.10.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4fc506f30fe1ee9011e9b10beb484e653c7518ac Author: YueZhang <[email protected]> AuthorDate: Fri Dec 31 15:56:33 2021 +0800 [HUDI-3107]Fix HiveSyncTool drop partitions using JDBC or hivesql or hms (#4453) * constructDropPartitions when drop partitions using jdbc * done * done * code style * code review Co-authored-by: yuezhang <[email protected]> --- .../org/apache/hudi/hive/ddl/HMSDDLExecutor.java | 4 +- .../apache/hudi/hive/ddl/HiveQueryDDLExecutor.java | 4 +- .../org/apache/hudi/hive/ddl/JDBCExecutor.java | 51 ++++++++++++++++++++-- .../hudi/hive/ddl/QueryBasedDDLExecutor.java | 4 +- .../apache/hudi/hive/util/HivePartitionUtil.java | 51 ++++++++++++++++++++++ .../org/apache/hudi/hive/TestHiveSyncTool.java | 42 ++++++++++++++++++ .../apache/hudi/hive/testutils/HiveTestUtil.java | 21 +++++++++ 7 files changed, 169 insertions(+), 8 deletions(-) diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java index d3efebe..c3c5226 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.StorageSchemes; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.PartitionValueExtractor; +import org.apache.hudi.hive.util.HivePartitionUtil; import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hadoop.fs.FileSystem; @@ -236,7 +237,8 @@ public class HMSDDLExecutor implements DDLExecutor { LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + tableName); try { for (String dropPartition : partitionsToDrop) { - client.dropPartition(syncConfig.databaseName, tableName, dropPartition, false); + String partitionClause = HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig); + client.dropPartition(syncConfig.databaseName, tableName, partitionClause, false); LOG.info("Drop partition " + dropPartition + " on " + tableName); } } catch (TException e) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java index 7161194..a4debfb 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hudi.hive.util.HivePartitionUtil; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -136,7 +137,8 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor { LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + tableName); try { for (String dropPartition : partitionsToDrop) { - metaStoreClient.dropPartition(config.databaseName, tableName, dropPartition, false); + String partitionClause = HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, config); + metaStoreClient.dropPartition(config.databaseName, tableName, partitionClause, false); LOG.info("Drop partition " + dropPartition + " on " + tableName); } } catch (Exception e) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java index 493d4ee..997d6e0 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java @@ -18,6 +18,8 @@ package org.apache.hudi.hive.ddl; +import static org.apache.hudi.hive.util.HiveSchemaUtil.HIVE_ESCAPE_CHARACTER; + import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; @@ -31,6 +33,7 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -144,9 +147,49 @@ public class JDBCExecutor extends QueryBasedDDLExecutor { @Override public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) { - partitionsToDrop.stream() - .map(partition -> String.format("ALTER TABLE `%s` DROP PARTITION (%s)", tableName, partition)) - .forEach(this::runSQL); + if (partitionsToDrop.isEmpty()) { + LOG.info("No partitions to add for " + tableName); + return; + } + LOG.info("Adding partitions " + partitionsToDrop.size() + " to table " + tableName); + List<String> sqls = constructDropPartitions(tableName, partitionsToDrop); + sqls.stream().forEach(sql -> runSQL(sql)); + } + + private List<String> constructDropPartitions(String tableName, List<String> partitions) { + if (config.batchSyncNum <= 0) { + throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter"); + } + List<String> result = new ArrayList<>(); + int batchSyncPartitionNum = config.batchSyncNum; + StringBuilder alterSQL = getAlterTableDropPrefix(tableName); + + for (int i = 0; i < partitions.size(); i++) { + String partitionClause = getPartitionClause(partitions.get(i)); + if (i == 0) { + alterSQL.append(" PARTITION (").append(partitionClause).append(")"); + } else { + alterSQL.append(", PARTITION (").append(partitionClause).append(")"); + } + + if ((i + 1) % batchSyncPartitionNum == 0) { + result.add(alterSQL.toString()); + alterSQL = getAlterTableDropPrefix(tableName); + } + } + // add left partitions to result + if (partitions.size() % batchSyncPartitionNum != 0) { + result.add(alterSQL.toString()); + } + return result; + } + + public StringBuilder getAlterTableDropPrefix(String tableName) { + StringBuilder alterSQL = new StringBuilder("ALTER TABLE "); + alterSQL.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName) + .append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER) + .append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" DROP IF EXISTS "); + return alterSQL; } @Override @@ -159,4 +202,4 @@ public class JDBCExecutor extends QueryBasedDDLExecutor { LOG.error("Could not close connection ", e); } } -} +} \ No newline at end of file diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java index aed2bbe..a1cc772 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java @@ -46,7 +46,7 @@ import static org.apache.hudi.hive.util.HiveSchemaUtil.HIVE_ESCAPE_CHARACTER; public abstract class QueryBasedDDLExecutor implements DDLExecutor { private static final Logger LOG = LogManager.getLogger(QueryBasedDDLExecutor.class); private final HiveSyncConfig config; - private final PartitionValueExtractor partitionValueExtractor; + public final PartitionValueExtractor partitionValueExtractor; private final FileSystem fs; public QueryBasedDDLExecutor(HiveSyncConfig config, FileSystem fs) { @@ -160,7 +160,7 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor { return alterSQL; } - private String getPartitionClause(String partition) { + public String getPartitionClause(String partition) { List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); ValidationUtils.checkArgument(config.partitionFields.size() == partitionValues.size(), "Partition key parts " + config.partitionFields + " does not match with partition values " + partitionValues diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java new file mode 100644 index 0000000..27e3a73 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.common.util.PartitionPathEncodeUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.PartitionValueExtractor; + +import java.util.ArrayList; +import java.util.List; + +public class HivePartitionUtil { + + /** + * Build String, example as year=2021/month=06/day=25 + */ + public static String getPartitionClauseForDrop(String partition, PartitionValueExtractor partitionValueExtractor, HiveSyncConfig config) { + List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); + ValidationUtils.checkArgument(config.partitionFields.size() == partitionValues.size(), + "Partition key parts " + config.partitionFields + " does not match with partition values " + partitionValues + + ". Check partition strategy. "); + List<String> partBuilder = new ArrayList<>(); + for (int i = 0; i < config.partitionFields.size(); i++) { + String partitionValue = partitionValues.get(i); + // decode the partition before sync to hive to prevent multiple escapes of HIVE + if (config.decodePartition) { + // This is a decode operator for encode in KeyGenUtils#getRecordPartitionPath + partitionValue = PartitionPathEncodeUtils.unescapePathName(partitionValue); + } + partBuilder.add(config.partitionFields.get(i) + "=" + partitionValue); + } + return String.join("/", partBuilder); + } +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index ef98641..eaca521 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -20,6 +20,7 @@ package org.apache.hudi.hive; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.NetworkTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; @@ -789,6 +790,47 @@ public class TestHiveSyncTool { @ParameterizedTest @MethodSource("syncMode") + public void testDropPartition(String syncMode) throws Exception { + hiveSyncConfig.syncMode = syncMode; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 3; + String instantTime = "100"; + HiveTestUtil.createCOWTable(instantTime, 1, true); + + HoodieHiveClient hiveClient = + new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName), + "Table " + hiveSyncConfig.tableName + " should not exist initially"); + // Lets do the sync + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + tool.syncHoodieTable(); + // we need renew the hiveclient after tool.syncHoodieTable(), because it will close hive + // session, then lead to connection retry, we can see there is a exception at log. + hiveClient = + new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), + "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist after sync completes"); + assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(), + hiveClient.getDataSchema().getColumns().size() + 1, + "Hive Schema should match the table schema + partition field"); + assertEquals(1, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + "Table partitions should match the number of partitions we wrote"); + assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), + "The last commit that was synced should be updated in the TBLPROPERTIES"); + // create a replace commit to delete current partitions + HiveTestUtil.createReplaceCommit("101", "2021/12/28", WriteOperationType.DELETE_PARTITION); + + // sync drop partitins + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + tool.syncHoodieTable(); + + hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + List<Partition> hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName); + assertEquals(0, hivePartitions.size(), + "Table should have 0 partition because of the drop the only one partition"); + } + + @ParameterizedTest + @MethodSource("syncMode") public void testNonPartitionedSync(String syncMode) throws Exception { hiveSyncConfig.syncMode = syncMode; diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index a3bc226..b54005b 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -28,8 +28,10 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieDeltaWriteStat; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; @@ -176,6 +178,16 @@ public class HiveTestUtil { createCommitFile(commitMetadata, instantTime); } + public static void createReplaceCommit(String instantTime, String partitions, WriteOperationType type) + throws IOException { + HoodieReplaceCommitMetadata replaceCommitMetadata = new HoodieReplaceCommitMetadata(); + replaceCommitMetadata.setOperationType(type); + Map<String, List<String>> partitionToReplaceFileIds = new HashMap<>(); + partitionToReplaceFileIds.put(partitions, new ArrayList<>()); + replaceCommitMetadata.setPartitionToReplaceFileIds(partitionToReplaceFileIds); + createReplaceCommitFile(replaceCommitMetadata, instantTime); + } + public static void createCOWTableWithSchema(String instantTime, String schemaFileName) throws IOException, URISyntaxException { Path path = new Path(hiveSyncConfig.basePath); @@ -442,6 +454,15 @@ public class HiveTestUtil { fsout.close(); } + public static void createReplaceCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException { + byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); + Path fullPath = new Path(hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + HoodieTimeline.makeReplaceFileName(instantTime)); + FSDataOutputStream fsout = fileSystem.create(fullPath, true); + fsout.write(bytes); + fsout.close(); + } + public static void createCommitFileWithSchema(HoodieCommitMetadata commitMetadata, String instantTime, boolean isSimpleSchema) throws IOException { addSchemaToCommitMetadata(commitMetadata, isSimpleSchema, true); createCommitFile(commitMetadata, instantTime);
