This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 779edc0 [HUDI-344] Add partitioner param to Exporter (#1405)
779edc0 is described below
commit 779edc068865898049569da0fe750574f93a0dca
Author: Raymond Xu <[email protected]>
AuthorDate: Wed Mar 18 04:24:04 2020 -0700
[HUDI-344] Add partitioner param to Exporter (#1405)
---
.../hudi/utilities/HoodieSnapshotExporter.java | 126 +++++++++++++--------
.../hudi/utilities/TestHoodieSnapshotExporter.java | 110 ++++++++++++++++--
2 files changed, 178 insertions(+), 58 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index b58b5d3..c39daa7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -18,16 +18,9 @@
package org.apache.hudi.utilities;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.SerializableConfiguration;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
@@ -36,6 +29,18 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+
+import com.beust.jcommander.IValueValidator;
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
@@ -45,41 +50,66 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.execution.datasources.DataSource;
-
-import scala.Tuple2;
-import scala.collection.JavaConversions;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+
/**
* Export the latest records of Hudi dataset to a set of external files (e.g.,
plain parquet files).
*
* @experimental This export is an experimental tool. If you want to export
hudi to hudi, please use HoodieSnapshotCopier.
*/
public class HoodieSnapshotExporter {
+
+ @FunctionalInterface
+ public interface Partitioner {
+
+ DataFrameWriter<Row> partition(Dataset<Row> source);
+
+ }
+
private static final Logger LOG =
LogManager.getLogger(HoodieSnapshotExporter.class);
+ public static class OutputFormatValidator implements IValueValidator<String>
{
+
+ static final String HUDI = "hudi";
+ static final List<String> FORMATS = ImmutableList.of("json", "parquet",
HUDI);
+
+ @Override
+ public void validate(String name, String value) {
+ if (value == null || !FORMATS.contains(value)) {
+ throw new ParameterException(
+ String.format("Invalid output format: value:%s: supported
formats:%s", value, FORMATS));
+ }
+ }
+ }
+
public static class Config implements Serializable {
+
@Parameter(names = {"--source-base-path"}, description = "Base path for
the source Hudi dataset to be snapshotted", required = true)
- String sourceBasePath = null;
+ String sourceBasePath;
- @Parameter(names = {"--target-base-path"}, description = "Base path for
the target output files (snapshots)", required = true)
- String targetOutputPath = null;
+ @Parameter(names = {"--target-output-path"}, description = "Base path for
the target output files (snapshots)", required = true)
+ String targetOutputPath;
- @Parameter(names = {"--output-format"}, description = "e.g. Hudi or
Parquet", required = true)
+ @Parameter(names = {"--output-format"}, description = "Output format for
the exported dataset; accept these values: json|parquet|hudi", required = true,
+ validateValueWith = OutputFormatValidator.class)
String outputFormat;
@Parameter(names = {"--output-partition-field"}, description = "A field to
be used by Spark repartitioning")
- String outputPartitionField;
+ String outputPartitionField = null;
+
+ @Parameter(names = {"--output-partitioner"}, description = "A class to
facilitate custom repartitioning")
+ String outputPartitioner = null;
}
- public int export(SparkSession spark, Config cfg) throws IOException {
+ public void export(SparkSession spark, Config cfg) throws IOException {
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
FileSystem fs = FSUtils.getFs(cfg.sourceBasePath,
jsc.hadoopConfiguration());
@@ -92,7 +122,7 @@ public class HoodieSnapshotExporter {
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
if (!latestCommit.isPresent()) {
LOG.error("No commits present. Nothing to snapshot");
- return -1;
+ return;
}
final String latestCommitTimestamp = latestCommit.get().getTimestamp();
LOG.info(String.format("Starting to snapshot latest version files which
are also no-late-than %s.",
@@ -106,28 +136,8 @@ public class HoodieSnapshotExporter {
dataFiles.addAll(fsView.getLatestBaseFilesBeforeOrOn(partition,
latestCommitTimestamp).map(f -> f.getPath()).collect(Collectors.toList()));
}
- try {
- DataSource.lookupDataSource(cfg.outputFormat,
spark.sessionState().conf());
- } catch (Exception e) {
- LOG.error(String.format("The %s output format is not supported! ",
cfg.outputFormat));
- return -1;
- }
- if (!cfg.outputFormat.equalsIgnoreCase("hudi")) {
- // Do transformation
- // A field to do simple Spark repartitioning
- DataFrameWriter<Row> write = null;
- Dataset<Row> original =
spark.read().parquet(JavaConversions.asScalaIterator(dataFiles.iterator()).toSeq());
- List<Column> needColumns =
Arrays.asList(original.columns()).stream().filter(col ->
!col.startsWith("_hoodie_")).map(col -> new
Column(col)).collect(Collectors.toList());
- Dataset<Row> reader =
original.select(JavaConversions.asScalaIterator(needColumns.iterator()).toSeq());
- if (!StringUtils.isNullOrEmpty(cfg.outputPartitionField)) {
- write = reader.repartition(new Column(cfg.outputPartitionField))
- .write().partitionBy(cfg.outputPartitionField);
- } else {
- write = reader.write();
- }
- write.format(cfg.outputFormat)
- .mode(SaveMode.Overwrite)
- .save(cfg.targetOutputPath);
+ if (!cfg.outputFormat.equals(OutputFormatValidator.HUDI)) {
+ exportAsNonHudi(spark, cfg, dataFiles);
} else {
// No transformation is needed for output format "HUDI", just copy the
original files.
copySnapshot(jsc, fs, cfg, partitions, dataFiles,
latestCommitTimestamp, serConf);
@@ -136,16 +146,34 @@ public class HoodieSnapshotExporter {
} else {
LOG.info("The job has 0 partition to copy.");
}
- return 0;
+ }
+
+ private void exportAsNonHudi(SparkSession spark, Config cfg, List<String>
dataFiles) {
+ Partitioner defaultPartitioner = dataset -> {
+ Dataset<Row> hoodieDroppedDataset =
dataset.drop(JavaConversions.asScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq());
+ return StringUtils.isNullOrEmpty(cfg.outputPartitionField)
+ ? hoodieDroppedDataset.write()
+ : hoodieDroppedDataset.repartition(new
Column(cfg.outputPartitionField)).write().partitionBy(cfg.outputPartitionField);
+ };
+
+ Partitioner partitioner = StringUtils.isNullOrEmpty(cfg.outputPartitioner)
+ ? defaultPartitioner
+ : ReflectionUtils.loadClass(cfg.outputPartitioner);
+
+ Dataset<Row> sourceDataset =
spark.read().parquet(JavaConversions.asScalaIterator(dataFiles.iterator()).toSeq());
+ partitioner.partition(sourceDataset)
+ .format(cfg.outputFormat)
+ .mode(SaveMode.Overwrite)
+ .save(cfg.targetOutputPath);
}
private void copySnapshot(JavaSparkContext jsc,
- FileSystem fs,
- Config cfg,
- List<String> partitions,
- List<String> dataFiles,
- String latestCommitTimestamp,
- SerializableConfiguration serConf) throws
IOException {
+ FileSystem fs,
+ Config cfg,
+ List<String> partitions,
+ List<String> dataFiles,
+ String latestCommitTimestamp,
+ SerializableConfiguration serConf) throws IOException {
// Make sure the output directory is empty
Path outputPath = new Path(cfg.targetOutputPath);
if (fs.exists(outputPath)) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
index f624247..6eb15a2 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
@@ -29,13 +29,20 @@ import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.utilities.HoodieSnapshotExporter.Config;
+import org.apache.hudi.utilities.HoodieSnapshotExporter.OutputFormatValidator;
+import org.apache.hudi.utilities.HoodieSnapshotExporter.Partitioner;
+import com.beust.jcommander.ParameterException;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.After;
import org.junit.Before;
@@ -52,6 +59,7 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@RunWith(Enclosed.class)
@@ -62,7 +70,7 @@ public class TestHoodieSnapshotExporter {
static final Logger LOG = LogManager.getLogger(ExporterTestHarness.class);
static final int NUM_RECORDS = 100;
static final String COMMIT_TIME = "20200101000000";
- static final String PARTITION_PATH = "2020/01/01";
+ static final String PARTITION_PATH = "2020";
static final String TABLE_NAME = "testing";
String sourcePath;
String targetPath;
@@ -119,12 +127,19 @@ public class TestHoodieSnapshotExporter {
public static class TestHoodieSnapshotExporterForHudi extends
ExporterTestHarness {
- @Test
- public void testExportAsHudi() throws IOException {
- HoodieSnapshotExporter.Config cfg = new Config();
+ private HoodieSnapshotExporter.Config cfg;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ cfg = new Config();
cfg.sourceBasePath = sourcePath;
cfg.targetOutputPath = targetPath;
- cfg.outputFormat = "hudi";
+ cfg.outputFormat = OutputFormatValidator.HUDI;
+ }
+
+ @Test
+ public void testExportAsHudi() throws IOException {
new
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
cfg);
// Check results
@@ -151,10 +166,6 @@ public class TestHoodieSnapshotExporter {
dfs.delete(new Path(sourcePath + "/" + PARTITION_PATH), true);
// export
- HoodieSnapshotExporter.Config cfg = new Config();
- cfg.sourceBasePath = sourcePath;
- cfg.targetOutputPath = targetPath;
- cfg.outputFormat = "hudi";
new
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
cfg);
// Check results
@@ -185,4 +196,85 @@ public class TestHoodieSnapshotExporter {
assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
}
}
+
+ public static class TestHoodieSnapshotExporterForRepartitioning extends
ExporterTestHarness {
+
+ private static final String PARTITION_NAME = "year";
+
+ public static class UserDefinedPartitioner implements Partitioner {
+
+ @Override
+ public DataFrameWriter<Row> partition(Dataset<Row> source) {
+ return source
+ .withColumnRenamed(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
PARTITION_NAME)
+ .repartition(new Column(PARTITION_NAME))
+ .write()
+ .partitionBy(PARTITION_NAME);
+ }
+ }
+
+ private HoodieSnapshotExporter.Config cfg;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ cfg = new Config();
+ cfg.sourceBasePath = sourcePath;
+ cfg.targetOutputPath = targetPath;
+ cfg.outputFormat = "json";
+ }
+
+ @Test
+ public void testExportWithPartitionField() throws IOException {
+ // `driver` field is set in HoodieTestDataGenerator
+ cfg.outputPartitionField = "driver";
+ new
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
cfg);
+
+ assertEquals(NUM_RECORDS,
sqlContext.read().format("json").load(targetPath).count());
+ assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
+ assertTrue(dfs.listStatus(new Path(targetPath)).length > 1);
+ }
+
+ @Test
+ public void testExportForUserDefinedPartitioner() throws IOException {
+ cfg.outputPartitioner = UserDefinedPartitioner.class.getName();
+ new
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
cfg);
+
+ assertEquals(NUM_RECORDS,
sqlContext.read().format("json").load(targetPath).count());
+ assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
+ assertTrue(dfs.exists(new Path(String.format("%s/%s=%s", targetPath,
PARTITION_NAME, PARTITION_PATH))));
+ }
+ }
+
+ @RunWith(Parameterized.class)
+ public static class TestHoodieSnapshotExporterInputValidation {
+
+ @Parameters
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"json", true}, {"parquet", true}, {"hudi", true},
+ {"JSON", false}, {"foo", false}, {null, false}, {"", false}
+ });
+ }
+
+ @Parameter
+ public String format;
+ @Parameter(1)
+ public boolean isValid;
+
+ @Test
+ public void testValidateOutputFormat() {
+ Throwable t = null;
+ try {
+ new OutputFormatValidator().validate(null, format);
+ } catch (Exception e) {
+ t = e;
+ }
+ if (isValid) {
+ assertNull(t);
+ } else {
+ assertTrue(t instanceof ParameterException);
+ }
+ }
+ }
}