This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4fc506f30fe1ee9011e9b10beb484e653c7518ac
Author: YueZhang <[email protected]>
AuthorDate: Fri Dec 31 15:56:33 2021 +0800

    [HUDI-3107]Fix HiveSyncTool drop partitions using JDBC or hivesql or hms 
(#4453)
    
    * constructDropPartitions when drop partitions using jdbc
    
    * done
    
    * done
    
    * code style
    
    * code review
    
    Co-authored-by: yuezhang <[email protected]>
---
 .../org/apache/hudi/hive/ddl/HMSDDLExecutor.java   |  4 +-
 .../apache/hudi/hive/ddl/HiveQueryDDLExecutor.java |  4 +-
 .../org/apache/hudi/hive/ddl/JDBCExecutor.java     | 51 ++++++++++++++++++++--
 .../hudi/hive/ddl/QueryBasedDDLExecutor.java       |  4 +-
 .../apache/hudi/hive/util/HivePartitionUtil.java   | 51 ++++++++++++++++++++++
 .../org/apache/hudi/hive/TestHiveSyncTool.java     | 42 ++++++++++++++++++
 .../apache/hudi/hive/testutils/HiveTestUtil.java   | 21 +++++++++
 7 files changed, 169 insertions(+), 8 deletions(-)

diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
index d3efebe..c3c5226 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.StorageSchemes;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HoodieHiveSyncException;
 import org.apache.hudi.hive.PartitionValueExtractor;
+import org.apache.hudi.hive.util.HivePartitionUtil;
 import org.apache.hudi.hive.util.HiveSchemaUtil;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -236,7 +237,8 @@ public class HMSDDLExecutor implements DDLExecutor {
     LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + 
tableName);
     try {
       for (String dropPartition : partitionsToDrop) {
-        client.dropPartition(syncConfig.databaseName, tableName, 
dropPartition, false);
+        String partitionClause = 
HivePartitionUtil.getPartitionClauseForDrop(dropPartition, 
partitionValueExtractor, syncConfig);
+        client.dropPartition(syncConfig.databaseName, tableName, 
partitionClause, false);
         LOG.info("Drop partition " + dropPartition + " on " + tableName);
       }
     } catch (TException e) {
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
index 7161194..a4debfb 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hudi.hive.util.HivePartitionUtil;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -136,7 +137,8 @@ public class HiveQueryDDLExecutor extends 
QueryBasedDDLExecutor {
     LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + 
tableName);
     try {
       for (String dropPartition : partitionsToDrop) {
-        metaStoreClient.dropPartition(config.databaseName, tableName, 
dropPartition, false);
+        String partitionClause = 
HivePartitionUtil.getPartitionClauseForDrop(dropPartition, 
partitionValueExtractor, config);
+        metaStoreClient.dropPartition(config.databaseName, tableName, 
partitionClause, false);
         LOG.info("Drop partition " + dropPartition + " on " + tableName);
       }
     } catch (Exception e) {
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
index 493d4ee..997d6e0 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.hive.ddl;
 
+import static org.apache.hudi.hive.util.HiveSchemaUtil.HIVE_ESCAPE_CHARACTER;
+
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HoodieHiveSyncException;
 
@@ -31,6 +33,7 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -144,9 +147,49 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
 
   @Override
   public void dropPartitionsToTable(String tableName, List<String> 
partitionsToDrop) {
-    partitionsToDrop.stream()
-        .map(partition -> String.format("ALTER TABLE `%s` DROP PARTITION 
(%s)", tableName, partition))
-        .forEach(this::runSQL);
+    if (partitionsToDrop.isEmpty()) {
+      LOG.info("No partitions to add for " + tableName);
+      return;
+    }
+    LOG.info("Adding partitions " + partitionsToDrop.size() + " to table " + 
tableName);
+    List<String> sqls = constructDropPartitions(tableName, partitionsToDrop);
+    sqls.stream().forEach(sql -> runSQL(sql));
+  }
+
+  private List<String> constructDropPartitions(String tableName, List<String> 
partitions) {
+    if (config.batchSyncNum <= 0) {
+      throw new HoodieHiveSyncException("batch-sync-num for sync hive table 
must be greater than 0, pls check your parameter");
+    }
+    List<String> result = new ArrayList<>();
+    int batchSyncPartitionNum = config.batchSyncNum;
+    StringBuilder alterSQL = getAlterTableDropPrefix(tableName);
+
+    for (int i = 0; i < partitions.size(); i++) {
+      String partitionClause = getPartitionClause(partitions.get(i));
+      if (i == 0) {
+        alterSQL.append(" PARTITION (").append(partitionClause).append(")");
+      } else {
+        alterSQL.append(", PARTITION (").append(partitionClause).append(")");
+      }
+
+      if ((i + 1) % batchSyncPartitionNum == 0) {
+        result.add(alterSQL.toString());
+        alterSQL = getAlterTableDropPrefix(tableName);
+      }
+    }
+    // add left partitions to result
+    if (partitions.size() % batchSyncPartitionNum != 0) {
+      result.add(alterSQL.toString());
+    }
+    return result;
+  }
+
+  public StringBuilder getAlterTableDropPrefix(String tableName) {
+    StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
+    alterSQL.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName)
+        
.append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER)
+        .append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" DROP IF 
EXISTS ");
+    return alterSQL;
   }
 
   @Override
@@ -159,4 +202,4 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
       LOG.error("Could not close connection ", e);
     }
   }
-}
+}
\ No newline at end of file
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
index aed2bbe..a1cc772 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
@@ -46,7 +46,7 @@ import static 
org.apache.hudi.hive.util.HiveSchemaUtil.HIVE_ESCAPE_CHARACTER;
 public abstract class QueryBasedDDLExecutor implements DDLExecutor {
   private static final Logger LOG = 
LogManager.getLogger(QueryBasedDDLExecutor.class);
   private final HiveSyncConfig config;
-  private final PartitionValueExtractor partitionValueExtractor;
+  public final PartitionValueExtractor partitionValueExtractor;
   private final FileSystem fs;
 
   public QueryBasedDDLExecutor(HiveSyncConfig config, FileSystem fs) {
@@ -160,7 +160,7 @@ public abstract class QueryBasedDDLExecutor implements 
DDLExecutor {
     return alterSQL;
   }
 
-  private String getPartitionClause(String partition) {
+  public String getPartitionClause(String partition) {
     List<String> partitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(partition);
     ValidationUtils.checkArgument(config.partitionFields.size() == 
partitionValues.size(),
         "Partition key parts " + config.partitionFields + " does not match 
with partition values " + partitionValues
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java
new file mode 100644
index 0000000..27e3a73
--- /dev/null
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.util;
+
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.hive.PartitionValueExtractor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HivePartitionUtil {
+
+  /**
+   * Build String, example as year=2021/month=06/day=25
+   */
+  public static String getPartitionClauseForDrop(String partition, 
PartitionValueExtractor partitionValueExtractor, HiveSyncConfig config) {
+    List<String> partitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(partition);
+    ValidationUtils.checkArgument(config.partitionFields.size() == 
partitionValues.size(),
+        "Partition key parts " + config.partitionFields + " does not match 
with partition values " + partitionValues
+            + ". Check partition strategy. ");
+    List<String> partBuilder = new ArrayList<>();
+    for (int i = 0; i < config.partitionFields.size(); i++) {
+      String partitionValue = partitionValues.get(i);
+      // decode the partition before sync to hive to prevent multiple escapes 
of HIVE
+      if (config.decodePartition) {
+        // This is a decode operator for encode in 
KeyGenUtils#getRecordPartitionPath
+        partitionValue = 
PartitionPathEncodeUtils.unescapePathName(partitionValue);
+      }
+      partBuilder.add(config.partitionFields.get(i) + "=" + partitionValue);
+    }
+    return String.join("/", partBuilder);
+  }
+}
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index ef98641..eaca521 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -20,6 +20,7 @@ package org.apache.hudi.hive;
 
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.NetworkTestUtils;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
@@ -789,6 +790,47 @@ public class TestHiveSyncTool {
 
   @ParameterizedTest
   @MethodSource("syncMode")
+  public void testDropPartition(String syncMode) throws Exception {
+    hiveSyncConfig.syncMode = syncMode;
+    HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
+    String instantTime = "100";
+    HiveTestUtil.createCOWTable(instantTime, 1, true);
+
+    HoodieHiveClient hiveClient =
+        new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), 
fileSystem);
+    assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
+        "Table " + hiveSyncConfig.tableName + " should not exist initially");
+    // Lets do the sync
+    HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, 
HiveTestUtil.getHiveConf(), fileSystem);
+    tool.syncHoodieTable();
+    // we need renew the hiveclient after tool.syncHoodieTable(), because it 
will close hive
+    // session, then lead to connection retry, we can see there is a exception 
at log.
+    hiveClient =
+        new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    
assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
+        "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist 
after sync completes");
+    
assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
+        hiveClient.getDataSchema().getColumns().size() + 1,
+        "Hive Schema should match the table schema + partition field");
+    assertEquals(1, 
hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
+        "Table partitions should match the number of partitions we wrote");
+    assertEquals(instantTime, 
hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
+        "The last commit that was synced should be updated in the 
TBLPROPERTIES");
+    // create a replace commit to delete current partitions
+    HiveTestUtil.createReplaceCommit("101", "2021/12/28", 
WriteOperationType.DELETE_PARTITION);
+
+    // sync drop partitins
+    tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), 
fileSystem);
+    tool.syncHoodieTable();
+
+    hiveClient = new HoodieHiveClient(hiveSyncConfig, 
HiveTestUtil.getHiveConf(), fileSystem);
+    List<Partition> hivePartitions = 
hiveClient.scanTablePartitions(hiveSyncConfig.tableName);
+    assertEquals(0, hivePartitions.size(),
+        "Table should have 0 partition because of the drop the only one 
partition");
+  }
+
+  @ParameterizedTest
+  @MethodSource("syncMode")
   public void testNonPartitionedSync(String syncMode) throws Exception {
 
     hiveSyncConfig.syncMode = syncMode;
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index a3bc226..b54005b 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -28,8 +28,10 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieDeltaWriteStat;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
@@ -176,6 +178,16 @@ public class HiveTestUtil {
     createCommitFile(commitMetadata, instantTime);
   }
 
+  public static void createReplaceCommit(String instantTime, String 
partitions, WriteOperationType type)
+      throws IOException {
+    HoodieReplaceCommitMetadata replaceCommitMetadata = new 
HoodieReplaceCommitMetadata();
+    replaceCommitMetadata.setOperationType(type);
+    Map<String, List<String>> partitionToReplaceFileIds = new HashMap<>();
+    partitionToReplaceFileIds.put(partitions, new ArrayList<>());
+    
replaceCommitMetadata.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
+    createReplaceCommitFile(replaceCommitMetadata, instantTime);
+  }
+
   public static void createCOWTableWithSchema(String instantTime, String 
schemaFileName)
       throws IOException, URISyntaxException {
     Path path = new Path(hiveSyncConfig.basePath);
@@ -442,6 +454,15 @@ public class HiveTestUtil {
     fsout.close();
   }
 
+  public static void createReplaceCommitFile(HoodieCommitMetadata 
commitMetadata, String instantTime) throws IOException {
+    byte[] bytes = 
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
+    Path fullPath = new Path(hiveSyncConfig.basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+        + HoodieTimeline.makeReplaceFileName(instantTime));
+    FSDataOutputStream fsout = fileSystem.create(fullPath, true);
+    fsout.write(bytes);
+    fsout.close();
+  }
+
   public static void createCommitFileWithSchema(HoodieCommitMetadata 
commitMetadata, String instantTime, boolean isSimpleSchema) throws IOException {
     addSchemaToCommitMetadata(commitMetadata, isSimpleSchema, true);
     createCommitFile(commitMetadata, instantTime);

Reply via email to