This is an automated email from the ASF dual-hosted git repository.
satish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e177466 [HUDI-1350] Support Partition level delete API in HUDI (#2254)
e177466 is described below
commit e177466fd266ebf3a8d371ce1bf2ecf3bdfe90ed
Author: lw0090 <[email protected]>
AuthorDate: Tue Dec 29 07:01:06 2020 +0800
[HUDI-1350] Support Partition level delete API in HUDI (#2254)
* [HUDI-1350] Support Partition level delete API in HUDI
* [HUDI-1350] Support Partition level delete API in HUDI base
InsertOverwriteCommitAction
* [HUDI-1350] Support Partition level delete API in HUDI base
InsertOverwriteCommitAction
---
.../java/org/apache/hudi/table/HoodieTable.java | 9 ++
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 5 +
.../hudi/table/HoodieJavaCopyOnWriteTable.java | 5 +
.../apache/hudi/client/SparkRDDWriteClient.java | 7 ++
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 6 ++
.../SparkDeletePartitionCommitActionExecutor.java | 68 +++++++++++++
.../SparkInsertOverwriteCommitActionExecutor.java | 2 +-
...rkInsertOverwriteTableCommitActionExecutor.java | 6 --
.../TestHoodieClientOnCopyOnWriteStorage.java | 105 ++++++++++++++++++++-
.../hudi/testutils/HoodieClientTestUtils.java | 27 ++++--
.../hudi/common/model/WriteOperationType.java | 4 +
11 files changed, 228 insertions(+), 16 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 36cd89a..6b7a7d2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -159,6 +159,15 @@ public abstract class HoodieTable<T extends
HoodieRecordPayload, I, K, O> implem
public abstract HoodieWriteMetadata<O> delete(HoodieEngineContext context,
String instantTime, K keys);
/**
+ * Deletes all data of partitions.
+ * @param context HoodieEngineContext
+ * @param instantTime Instant Time for the action
+ * @param partitions {@link List} of partition to be deleted
+ * @return HoodieWriteMetadata
+ */
+ public abstract HoodieWriteMetadata deletePartitions(HoodieEngineContext
context, String instantTime, List<String> partitions);
+
+ /**
* Upserts the given prepared records into the Hoodie table, at the supplied
instantTime.
* <p>
* This implementation requires that the input records are already tagged,
and de-duped if needed.
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 3c4d7fb..d0cb8de 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -85,6 +85,11 @@ public class HoodieFlinkCopyOnWriteTable<T extends
HoodieRecordPayload> extends
}
@Override
+ public HoodieWriteMetadata deletePartitions(HoodieEngineContext context,
String instantTime, List<String> partitions) {
+ throw new HoodieNotSupportedException("DeletePartitions is not supported
yet");
+ }
+
+ @Override
public HoodieWriteMetadata<List<WriteStatus>>
upsertPrepped(HoodieEngineContext context, String instantTime,
List<HoodieRecord<T>> preppedRecords) {
return new FlinkUpsertPreppedCommitActionExecutor<>(context, config, this,
instantTime, preppedRecords).execute();
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 7f65889..ddc995a 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -85,6 +85,11 @@ public class HoodieJavaCopyOnWriteTable<T extends
HoodieRecordPayload> extends H
}
@Override
+ public HoodieWriteMetadata deletePartitions(HoodieEngineContext context,
String instantTime, List<String> partitions) {
+ throw new HoodieNotSupportedException("Delete partitions is not supported
yet");
+ }
+
+ @Override
public HoodieWriteMetadata<List<WriteStatus>>
upsertPrepped(HoodieEngineContext context,
String
instantTime,
List<HoodieRecord<T>> preppedRecords) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 18f5309..f7e7690 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -245,6 +245,13 @@ public class SparkRDDWriteClient<T extends
HoodieRecordPayload> extends
return postWrite(result, instantTime, table);
}
+ public HoodieWriteResult deletePartitions(List<String> partitions, String
instantTime) {
+ HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.DELETE_PARTITION, instantTime);
+ setOperationType(WriteOperationType.DELETE_PARTITION);
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> result =
table.deletePartitions(context,instantTime, partitions);
+ return new HoodieWriteResult(postWrite(result, instantTime, table),
result.getPartitionToReplaceFileIds());
+ }
+
@Override
protected JavaRDD<WriteStatus>
postWrite(HoodieWriteMetadata<JavaRDD<WriteStatus>> result,
String instantTime,
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 71085a2..357b5ce 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -50,6 +50,7 @@ import
org.apache.hudi.table.action.cluster.SparkClusteringPlanActionExecutor;
import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor;
import
org.apache.hudi.table.action.commit.SparkBulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkDeleteCommitActionExecutor;
+import
org.apache.hudi.table.action.commit.SparkDeletePartitionCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertCommitActionExecutor;
import
org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor;
import
org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor;
@@ -109,6 +110,11 @@ public class HoodieSparkCopyOnWriteTable<T extends
HoodieRecordPayload> extends
}
@Override
+ public HoodieWriteMetadata deletePartitions(HoodieEngineContext context,
String instantTime, List<String> partitions) {
+ return new SparkDeletePartitionCommitActionExecutor(context, config, this,
instantTime, partitions).execute();
+ }
+
+ @Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>>
upsertPrepped(HoodieEngineContext context, String instantTime,
JavaRDD<HoodieRecord<T>> preppedRecords) {
return new
SparkUpsertPreppedCommitActionExecutor<>((HoodieSparkEngineContext) context,
config, this, instantTime, preppedRecords).execute();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
new file mode 100644
index 0000000..ea1ef51
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SparkDeletePartitionCommitActionExecutor<T extends
HoodieRecordPayload<T>>
+ extends SparkInsertOverwriteCommitActionExecutor<T> {
+
+ private List<String> partitions;
+ public SparkDeletePartitionCommitActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
HoodieTable table,
+ String instantTime,
List<String> partitions) {
+ super(context, config, table, instantTime,null,
WriteOperationType.DELETE_PARTITION);
+ this.partitions = partitions;
+ }
+
+ @Override
+ public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+ JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ Map<String, List<String>> partitionToReplaceFileIds =
jsc.parallelize(partitions, partitions.size()).distinct()
+ .mapToPair(partitionPath -> new Tuple2<>(partitionPath,
getAllExistingFileIds(partitionPath))).collectAsMap();
+ HoodieWriteMetadata result = new HoodieWriteMetadata();
+ result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
+ result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
+
+ result.setWriteStatuses(jsc.emptyRDD());
+ this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new
HashMap<>(), new WorkloadStat())), instantTime);
+ this.commitOnAutoCommit(result);
+ return result;
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
index 1e38220..c5d3c76 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
@@ -77,7 +77,7 @@ public class SparkInsertOverwriteCommitActionExecutor<T
extends HoodieRecordPayl
new Tuple2<>(partitionPath,
getAllExistingFileIds(partitionPath))).collectAsMap();
}
- private List<String> getAllExistingFileIds(String partitionPath) {
+ protected List<String> getAllExistingFileIds(String partitionPath) {
// because new commit is not complete. it is safe to mark all existing
file Ids as old files
return table.getSliceView().getLatestFileSlices(partitionPath).map(fg ->
fg.getFileId()).distinct().collect(Collectors.toList());
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
index e349657..c014515 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
@@ -36,7 +36,6 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
public class SparkInsertOverwriteTableCommitActionExecutor<T extends
HoodieRecordPayload<T>>
extends SparkInsertOverwriteCommitActionExecutor<T> {
@@ -47,11 +46,6 @@ public class SparkInsertOverwriteTableCommitActionExecutor<T
extends HoodieRecor
super(context, config, table, instantTime, inputRecordsRDD,
WriteOperationType.INSERT_OVERWRITE_TABLE);
}
- protected List<String> getAllExistingFileIds(String partitionPath) {
- return table.getSliceView().getLatestFileSlices(partitionPath)
- .map(fg -> fg.getFileId()).distinct().collect(Collectors.toList());
- }
-
@Override
protected Map<String, List<String>>
getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index c201efd..e86cb2d 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -1103,7 +1103,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
* Test scenario of writing similar number file groups in partition.
*/
@Test
- public void testInsertOverwritePartitionHandlinWithSimilarNumberOfRecords()
throws Exception {
+ public void testInsertOverwritePartitionHandlingWithSimilarNumberOfRecords()
throws Exception {
verifyInsertOverwritePartitionHandling(3000, 3000);
}
@@ -1144,6 +1144,109 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
}
/**
+ * Test scenario of writing fewer file groups for first partition than
second an third partition.
+ */
+ @Test
+ public void verifyDeletePartitionsHandlingWithFewerRecordsFirstPartition()
throws Exception {
+ verifyDeletePartitionsHandling(1000, 3000, 3000);
+ }
+
+ /**
+ * Test scenario of writing similar number file groups in partition.
+ */
+ @Test
+ public void verifyDeletePartitionsHandlingWithSimilarNumberOfRecords()
throws Exception {
+ verifyDeletePartitionsHandling(3000, 3000, 3000);
+ }
+
+ /**
+ * Test scenario of writing more file groups for first partition than second
an third partition.
+ */
+ @Test
+ public void
verifyDeletePartitionsHandlingHandlingWithFewerRecordsSecondThirdPartition()
throws Exception {
+ verifyDeletePartitionsHandling(3000, 1000, 1000);
+ }
+
+ private Set<String> insertPartitionRecordsWithCommit(SparkRDDWriteClient
client, int recordsCount, String commitTime1, String partitionPath) {
+ client.startCommitWithTime(commitTime1);
+ List<HoodieRecord> inserts1 =
dataGen.generateInsertsForPartition(commitTime1, recordsCount, partitionPath);
+ JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 2);
+ List<WriteStatus> statuses = client.upsert(insertRecordsRDD1,
commitTime1).collect();
+ assertNoWriteErrors(statuses);
+ Set<String> batchBuckets = statuses.stream().map(s ->
s.getFileId()).collect(Collectors.toSet());
+ verifyRecordsWritten(commitTime1, inserts1, statuses);
+ return batchBuckets;
+ }
+
+ private Set<String> deletePartitionWithCommit(SparkRDDWriteClient client,
String commitTime, List<String> deletePartitionPath) {
+ client.startCommitWithTime(commitTime,
HoodieTimeline.REPLACE_COMMIT_ACTION);
+ HoodieWriteResult writeResult =
client.deletePartitions(deletePartitionPath, commitTime);
+ Set<String> deletePartitionReplaceFileIds =
+ writeResult.getPartitionToReplaceFileIds().entrySet()
+ .stream().flatMap(entry ->
entry.getValue().stream()).collect(Collectors.toSet());
+ return deletePartitionReplaceFileIds;
+ }
+
+ /**
+ * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records for
first partition.
+ * 2) Do write2 (upsert) with 'batch2RecordsCount' number of records for
second partition.
+ * 3) Do write3 (upsert) with 'batch3RecordsCount' number of records for
third partition.
+ * 4) delete first partition and check result.
+ * 5) delete second and third partition and check result.
+ *
+ */
+ private void verifyDeletePartitionsHandling(int batch1RecordsCount, int
batch2RecordsCount, int batch3RecordsCount) throws Exception {
+ HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false);
+ SparkRDDWriteClient client = getHoodieWriteClient(config, false);
+ dataGen = new HoodieTestDataGenerator();
+
+ // Do Inserts for DEFAULT_FIRST_PARTITION_PATH
+ String commitTime1 = "001";
+ Set<String> batch1Buckets =
+ this.insertPartitionRecordsWithCommit(client, batch1RecordsCount,
commitTime1, DEFAULT_FIRST_PARTITION_PATH);
+
+ // Do Inserts for DEFAULT_SECOND_PARTITION_PATH
+ String commitTime2 = "002";
+ Set<String> batch2Buckets =
+ this.insertPartitionRecordsWithCommit(client, batch2RecordsCount,
commitTime2, DEFAULT_SECOND_PARTITION_PATH);
+
+ // Do Inserts for DEFAULT_THIRD_PARTITION_PATH
+ String commitTime3 = "003";
+ Set<String> batch3Buckets =
+ this.insertPartitionRecordsWithCommit(client, batch3RecordsCount,
commitTime3, DEFAULT_THIRD_PARTITION_PATH);
+
+ // delete DEFAULT_FIRST_PARTITION_PATH
+ String commitTime4 = "004";
+ Set<String> deletePartitionReplaceFileIds1 =
+ deletePartitionWithCommit(client, commitTime4,
Arrays.asList(DEFAULT_FIRST_PARTITION_PATH));
+ assertEquals(batch1Buckets, deletePartitionReplaceFileIds1);
+ List<HoodieBaseFile> baseFiles =
HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
+ String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH));
+ assertEquals(0, baseFiles.size());
+ baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
+ String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH));
+ assertTrue(baseFiles.size() > 0);
+ baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
+ String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH));
+ assertTrue(baseFiles.size() > 0);
+
+ // delete DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH
+ String commitTime5 = "005";
+ Set<String> deletePartitionReplaceFileIds2 =
+ deletePartitionWithCommit(client, commitTime5,
Arrays.asList(DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH));
+ Set<String> expectedFileId = new HashSet<>();
+ expectedFileId.addAll(batch2Buckets);
+ expectedFileId.addAll(batch3Buckets);
+ assertEquals(expectedFileId, deletePartitionReplaceFileIds2);
+
+ baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
+ String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH),
+ String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH),
+ String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH));
+ assertEquals(0, baseFiles.size());
+ }
+
+ /**
* Verify data in parquet files matches expected records and commit time.
*/
private void verifyRecordsWritten(String commitTime, List<HoodieRecord>
expectedRecords, List<WriteStatus> allStatus) {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index 307e068..c91b51b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -147,6 +147,22 @@ public class HoodieClientTestUtils {
}
}
+ public static List<HoodieBaseFile> getLatestBaseFiles(String basePath,
FileSystem fs,
+ String... paths) {
+ List<HoodieBaseFile> latestFiles = new ArrayList<>();
+ try {
+ HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(fs.getConf(), basePath, true);
+ for (String path : paths) {
+ BaseFileOnlyView fileSystemView = new
HoodieTableFileSystemView(metaClient,
+ metaClient.getCommitsTimeline().filterCompletedInstants(),
fs.globStatus(new Path(path)));
+
latestFiles.addAll(fileSystemView.getLatestBaseFiles().collect(Collectors.toList()));
+ }
+ } catch (Exception e) {
+ throw new HoodieException("Error reading hoodie table as a dataframe",
e);
+ }
+ return latestFiles;
+ }
+
/**
* Reads the paths under the a hoodie table out as a DataFrame.
*/
@@ -154,14 +170,9 @@ public class HoodieClientTestUtils {
String... paths) {
List<String> filteredPaths = new ArrayList<>();
try {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(fs.getConf(), basePath, true);
- for (String path : paths) {
- BaseFileOnlyView fileSystemView = new
HoodieTableFileSystemView(metaClient,
- metaClient.getCommitsTimeline().filterCompletedInstants(),
fs.globStatus(new Path(path)));
- List<HoodieBaseFile> latestFiles =
fileSystemView.getLatestBaseFiles().collect(Collectors.toList());
- for (HoodieBaseFile file : latestFiles) {
- filteredPaths.add(file.getPath());
- }
+ List<HoodieBaseFile> latestFiles = getLatestBaseFiles(basePath, fs,
paths);
+ for (HoodieBaseFile file : latestFiles) {
+ filteredPaths.add(file.getPath());
}
return sqlContext.read().parquet(filteredPaths.toArray(new
String[filteredPaths.size()]));
} catch (Exception e) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index 39f0f62..f237156 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -42,6 +42,8 @@ public enum WriteOperationType {
INSERT_OVERWRITE("insert_overwrite"),
// cluster
CLUSTER("cluster"),
+ // delete partition
+ DELETE_PARTITION("delete_partition"),
// insert overwrite with dynamic partitioning
INSERT_OVERWRITE_TABLE("insert_overwrite_table"),
// used for old version
@@ -74,6 +76,8 @@ public enum WriteOperationType {
return DELETE;
case "insert_overwrite":
return INSERT_OVERWRITE;
+ case "delete_partition":
+ return DELETE_PARTITION;
case "insert_overwrite_table":
return INSERT_OVERWRITE_TABLE;
case "cluster":