satishkotha commented on a change in pull request #2254:
URL: https://github.com/apache/hudi/pull/2254#discussion_r540568019



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class SparkDeletePartitionCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
+    extends BaseSparkCommitActionExecutor<T> {
+
+  private List<String> partitions;
+  public SparkDeletePartitionCommitActionExecutor(HoodieEngineContext context,
+                                                  HoodieWriteConfig config, 
HoodieTable table,
+                                                  String instantTime, 
List<String> partitions) {
+    super(context, config, table, instantTime, 
WriteOperationType.DELETE_PARTITION);
+    this.partitions = partitions;
+  }
+
+  @Override
+  protected String getCommitActionType() {
+    return HoodieTimeline.REPLACE_COMMIT_ACTION;
+  }
+
+  @Override
+  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
+    Instant indexStartTime = Instant.now();
+    HoodieWriteMetadata result = new HoodieWriteMetadata();
+    result.setIndexUpdateDuration(Duration.between(indexStartTime, 
Instant.now()));
+    result.setWriteStatuses(jsc.emptyRDD());
+    Map<String, List<String>> partitionToReplaceFileIds = partitions.stream()

Review comment:
       can you parallelize this? It is very similar to what you did for 
'insert_overwrite_table'. Try to reuse the code from there if possible

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -999,6 +999,110 @@ private void verifyInsertOverwritePartitionHandling(int 
batch1RecordsCount, int
     verifyRecordsWritten(commitTime2, inserts2, statuses);
   }
 
+  /**
+   * Test scenario of writing fewer file groups for first partition than 
second an third partition.
+   */
+  @Test
+  public void verifyDeletePartitionsHandlingWithFewerRecordsFirstPartition() 
throws Exception {
+    verifyDeletePartitionsHandling(1000, 3000, 3000);
+  }
+
+  /**
+   * Test scenario of writing similar number file groups in partition.
+   */
+  @Test
+  public void verifyDeletePartitionsHandlingWithSimilarNumberOfRecords() 
throws Exception {
+    verifyDeletePartitionsHandling(3000, 3000, 3000);
+  }
+
+  /**
+   * Test scenario of writing more file groups for first partition than second 
an third partition.
+   */
+  @Test
+  public void 
verifyDeletePartitionsHandlingHandlingWithFewerRecordsSecondThirdPartition() 
throws Exception {
+    verifyDeletePartitionsHandling(3000, 1000, 1000);
+  }
+
+  /**
+   *  1) Do write1 (upsert) with 'batch1RecordsCount' number of records for 
first partition.
+   *  2) Do write2 (upsert) with 'batch2RecordsCount' number of records for 
second partition.
+   *  3) Do write3 (upsert) with 'batch3RecordsCount' number of records for 
third partition.
+   *  4) delete first partition and check result.
+   *  5) delete second and third partition and check result.
+   *
+   */
+  private void verifyDeletePartitionsHandling(int batch1RecordsCount, int 
batch2RecordsCount, int batch3RecordsCount) throws Exception {
+    HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false);
+    SparkRDDWriteClient client = getHoodieWriteClient(config, false);
+    dataGen = new HoodieTestDataGenerator();
+
+    // Do Inserts for DEFAULT_FIRST_PARTITION_PATH
+    String commitTime1 = "001";
+    client.startCommitWithTime(commitTime1);
+    List<HoodieRecord> inserts1 = 
dataGen.generateInsertsForPartition(commitTime1, batch1RecordsCount, 
DEFAULT_FIRST_PARTITION_PATH);
+    JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 2);
+    List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, 
commitTime1).collect();
+    assertNoWriteErrors(statuses);
+    Set<String> batch1Buckets = statuses.stream().map(s -> 
s.getFileId()).collect(Collectors.toSet());
+    verifyRecordsWritten(commitTime1, inserts1, statuses);
+
+    // Do Inserts for DEFAULT_SECOND_PARTITION_PATH
+    String commitTime2 = "002";

Review comment:
       could you try to simplify this test to avoid code duplication. Also, see 
if its possible to add similar test for MOR tables.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -157,6 +157,15 @@ private synchronized FileSystemViewManager 
getViewManager() {
    */
   public abstract HoodieWriteMetadata<O> delete(HoodieEngineContext context, 
String instantTime, K keys);
 
+  /**
+   * Deletes all data of partitions.
+   * @param context    HoodieEngineContext
+   * @param instantTime Instant Time for the action
+   * @param partitions   {@link List} of partition to be deleted
+   * @return HoodieWriteMetadata
+   */
+  public abstract HoodieWriteMetadata deletePartitions(HoodieEngineContext 
context, String instantTime, List<String> partitions);

Review comment:
       Do you think using JavaRDD<String> for partitions makes more sense 
(given all other APIs are using RDD)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to