This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 14dbbdf [HUDI-2189] Adding delete partitions support to DeltaStreamer
(#4787)
14dbbdf is described below
commit 14dbbdf4c7a45ab5a10889f8e558984455315829
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Feb 22 00:01:30 2022 -0500
[HUDI-2189] Adding delete partitions support to DeltaStreamer (#4787)
---
.../hudi/utilities/deltastreamer/DeltaSync.java | 4 ++
.../functional/HoodieDeltaStreamerTestBase.java | 2 +-
.../functional/TestHoodieDeltaStreamer.java | 57 +++++++++++++++++++++-
3 files changed, 61 insertions(+), 2 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index c376243..082a9b1 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -585,6 +585,10 @@ public class DeltaSync implements Serializable {
case INSERT_OVERWRITE_TABLE:
writeStatusRDD = writeClient.insertOverwriteTable(records,
instantTime).getWriteStatuses();
break;
+ case DELETE_PARTITION:
+ List<String> partitions = records.map(record ->
record.getPartitionPath()).distinct().collect();
+ writeStatusRDD = writeClient.deletePartitions(partitions,
instantTime).getWriteStatuses();
+ break;
default:
throw new HoodieDeltaStreamerException("Unknown operation : " +
cfg.operation);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
index 9b7ee3b..02b1848 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
@@ -173,7 +173,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
props.setProperty("include", "sql-transformer.properties");
props.setProperty("hoodie.datasource.write.keygenerator.class",
TestHoodieDeltaStreamer.TestGenerator.class.getName());
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
- props.setProperty("hoodie.datasource.write.partitionpath.field",
"not_there");
+ props.setProperty("hoodie.datasource.write.partitionpath.field",
"partition_path");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index a6fdf00..1c80896 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -272,6 +272,19 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
assertEquals(expected, recordCount);
}
+ static Map<String, Long> getPartitionRecordCount(String basePath,
SQLContext sqlContext) {
+ sqlContext.clearCache();
+ List<Row> rows =
sqlContext.read().format("org.apache.hudi").load(basePath).groupBy(HoodieRecord.PARTITION_PATH_METADATA_FIELD).count().collectAsList();
+ Map<String, Long> partitionRecordCount = new HashMap<>();
+ rows.stream().forEach(row -> partitionRecordCount.put(row.getString(0),
row.getLong(1)));
+ return partitionRecordCount;
+ }
+
+ static void assertNoPartitionMatch(String basePath, SQLContext sqlContext,
String partitionToValidate) {
+ sqlContext.clearCache();
+ assertEquals(0,
sqlContext.read().format("org.apache.hudi").load(basePath).filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD
+ " = " + partitionToValidate).count());
+ }
+
static void assertDistinctRecordCount(long expected, String tablePath,
SQLContext sqlContext) {
sqlContext.clearCache();
long recordCount =
sqlContext.read().format("org.apache.hudi").load(tablePath).select("_hoodie_record_key").distinct().count();
@@ -1378,6 +1391,13 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean
hasTransformer, String sourceSchemaFile, String targetSchemaFile,
String propsFileName, String
parquetSourceRoot, boolean addCommonProps) throws IOException {
+ prepareParquetDFSSource(useSchemaProvider, hasTransformer,
sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot,
addCommonProps,
+ "not_there");
+ }
+
+ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean
hasTransformer, String sourceSchemaFile, String targetSchemaFile,
+ String propsFileName, String
parquetSourceRoot, boolean addCommonProps,
+ String partitionPath) throws
IOException {
// Properties used for testing delta-streamer with Parquet source
TypedProperties parquetProps = new TypedProperties();
@@ -1388,7 +1408,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
parquetProps.setProperty("include", "base.properties");
parquetProps.setProperty("hoodie.embed.timeline.server", "false");
parquetProps.setProperty("hoodie.datasource.write.recordkey.field",
"_row_key");
- parquetProps.setProperty("hoodie.datasource.write.partitionpath.field",
"not_there");
+ parquetProps.setProperty("hoodie.datasource.write.partitionpath.field",
partitionPath);
if (useSchemaProvider) {
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/" + sourceSchemaFile);
if (hasTransformer) {
@@ -1855,6 +1875,31 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
testDeltaStreamerWithSpecifiedOperation(dfsBasePath +
"/insert_overwrite_table", WriteOperationType.INSERT_OVERWRITE_TABLE);
}
+ @Test
+ public void testDeletePartitions() throws Exception {
+ prepareParquetDFSSource(false, false, "source.avsc", "target.avsc",
+ PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false,
"partition_path");
+ String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
ParquetDFSSource.class.getName(),
+ null, PROPS_FILENAME_TEST_PARQUET, false,
+ false, 100000, false, null, null, "timestamp", null), jsc);
+ deltaStreamer.sync();
+ TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath,
sqlContext);
+ testNum++;
+
+ prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
+ prepareParquetDFSSource(false, false);
+ // set write operation to DELETE_PARTITION and add transformer to filter
only for records with partition HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION
+ deltaStreamer = new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(tableBasePath,
WriteOperationType.DELETE_PARTITION, ParquetDFSSource.class.getName(),
+
Collections.singletonList(TestSpecificPartitionTransformer.class.getName()),
PROPS_FILENAME_TEST_PARQUET, false,
+ false, 100000, false, null, null, "timestamp", null), jsc);
+ deltaStreamer.sync();
+ // No records should match the
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION.
+ TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
+ }
+
void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath,
WriteOperationType operationType) throws Exception {
// Initial insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT);
@@ -2001,6 +2046,16 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
}
+ public static class TestSpecificPartitionTransformer implements Transformer {
+
+ @Override
+ public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset,
+ TypedProperties properties) {
+ Dataset<Row> toReturn = rowDataset.filter("partition_path == '" +
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "'");
+ return toReturn;
+ }
+ }
+
/**
* Add new field evoluted_optional_union_field with value of the field rider.
*/