This is an automated email from the ASF dual-hosted git repository. vinoyang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 3a2fe13 [HUDI-701] Add unit test for HDFSParquetImportCommand (#1574) 3a2fe13 is described below commit 3a2fe13fcb7c168f8ff023e3bdb6ae482b400316 Author: hongdd <jn_...@163.com> AuthorDate: Thu May 14 19:15:49 2020 +0800 [HUDI-701] Add unit test for HDFSParquetImportCommand (#1574) --- hudi-cli/pom.xml | 7 + .../cli/commands/HDFSParquetImportCommand.java | 8 +- .../org/apache/hudi/cli/commands/SparkMain.java | 19 +-- .../cli/integ/ITTestHDFSParquetImportCommand.java | 186 +++++++++++++++++++++ .../functional/TestHDFSParquetImporter.java | 8 +- 5 files changed, 209 insertions(+), 19 deletions(-) diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml index fed2bf9..dbb4463 100644 --- a/hudi-cli/pom.xml +++ b/hudi-cli/pom.xml @@ -194,6 +194,13 @@ <scope>test</scope> <type>test-jar</type> </dependency> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-utilities_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> <!-- Logging --> <dependency> diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java index 0f1db50..a31f310 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java @@ -18,7 +18,6 @@ package org.apache.hudi.cli.commands; -import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.commands.SparkMain.SparkCommand; import org.apache.hudi.cli.utils.InputStreamConsumer; import org.apache.hudi.cli.utils.SparkUtil; @@ -57,6 +56,7 @@ public class HDFSParquetImportCommand implements CommandMarker { @CliOption(key = "schemaFilePath", mandatory = true, help = "Path for Avro schema file") final String schemaFilePath, @CliOption(key = "format", mandatory = true, help = "Format for the input data") final String format, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, @CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor memory") final String sparkMemory, @CliOption(key = "retry", mandatory = true, help = "Number of retries") final String retry, @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for importing", @@ -66,8 +66,6 @@ public class HDFSParquetImportCommand implements CommandMarker { (new FormatValidator()).validate("format", format); - boolean initialized = HoodieCLI.initConf(); - HoodieCLI.initFS(initialized); String sparkPropertiesPath = Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); @@ -78,8 +76,8 @@ public class HDFSParquetImportCommand implements CommandMarker { cmd = SparkCommand.UPSERT.toString(); } - sparkLauncher.addAppArgs(cmd, srcPath, targetPath, tableName, tableType, rowKeyField, partitionPathField, - parallelism, schemaFilePath, sparkMemory, retry, propsFilePath); + sparkLauncher.addAppArgs(cmd, master, sparkMemory, srcPath, targetPath, tableName, tableType, rowKeyField, + partitionPathField, parallelism, schemaFilePath, retry, propsFilePath); UtilHelpers.validateAndAddProperties(configs, sparkLauncher); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 5d8972d..be9d7dd 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -82,17 +82,17 @@ public class SparkMain { break; case IMPORT: case UPSERT: - assert (args.length >= 12); + assert (args.length >= 13); String propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[11])) { - propsFilePath = args[11]; + if (!StringUtils.isNullOrEmpty(args[12])) { + propsFilePath = args[12]; } List<String> configs = new ArrayList<>(); - if (args.length > 12) { - configs.addAll(Arrays.asList(args).subList(12, args.length)); + if (args.length > 13) { + configs.addAll(Arrays.asList(args).subList(13, args.length)); } - returnCode = dataLoad(jsc, command, args[1], args[2], args[3], args[4], args[5], args[6], - Integer.parseInt(args[7]), args[8], args[9], Integer.parseInt(args[10]), propsFilePath, configs); + returnCode = dataLoad(jsc, command, args[3], args[4], args[5], args[6], args[7], args[8], + Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs); break; case COMPACT_RUN: assert (args.length >= 9); @@ -163,7 +163,7 @@ public class SparkMain { private static boolean sparkMasterContained(SparkCommand command) { List<SparkCommand> masterContained = Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR, SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN, - SparkCommand.DEDUPLICATE); + SparkCommand.IMPORT, SparkCommand.UPSERT, SparkCommand.DEDUPLICATE); return masterContained.contains(command); } @@ -177,7 +177,7 @@ public class SparkMain { } private static int dataLoad(JavaSparkContext jsc, String command, String srcPath, String targetPath, String tableName, - String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile, String sparkMemory, + String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile, int retry, String propsFilePath, List<String> configs) { Config cfg = new Config(); cfg.command = command; @@ -191,7 +191,6 @@ public class SparkMain { cfg.schemaFile = schemaFile; cfg.propsFilePath = propsFilePath; cfg.configs = configs; - jsc.getConf().set("spark.executor.memory", sparkMemory); return new HDFSParquetImporter(cfg).dataImport(jsc, retry); } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java new file mode 100644 index 0000000..347396b --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.integ; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.cli.AbstractShellIntegrationTest; +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.commands.TableCommand; +import org.apache.hudi.common.HoodieClientTestUtils; +import org.apache.hudi.common.HoodieTestDataGenerator; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.utilities.HDFSParquetImporter; +import org.apache.hudi.utilities.functional.TestHDFSParquetImporter; +import org.apache.hudi.utilities.functional.TestHDFSParquetImporter.HoodieTripModel; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeEach; +import org.springframework.shell.core.CommandResult; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.ParseException; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test class for {@link org.apache.hudi.cli.commands.HDFSParquetImportCommand}. + */ +public class ITTestHDFSParquetImportCommand extends AbstractShellIntegrationTest { + + private Path sourcePath; + private Path targetPath; + private String tableName; + private String schemaFile; + private String tablePath; + + private List<GenericRecord> insertData; + private TestHDFSParquetImporter importer; + + @BeforeEach + public void init() throws IOException, ParseException { + tableName = "test_table"; + tablePath = basePath + File.separator + tableName; + sourcePath = new Path(basePath, "source"); + targetPath = new Path(tablePath); + schemaFile = new Path(basePath, "file.schema").toString(); + + // create schema file + try (FSDataOutputStream schemaFileOS = fs.create(new Path(schemaFile))) { + schemaFileOS.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA.getBytes()); + } + + importer = new TestHDFSParquetImporter(); + insertData = importer.createInsertRecords(sourcePath); + } + + /** + * Test case for 'hdfsparquetimport' with insert. + */ + @Test + public void testConvertWithInsert() throws IOException { + String command = String.format("hdfsparquetimport --srcPath %s --targetPath %s --tableName %s " + + "--tableType %s --rowKeyField %s" + " --partitionPathField %s --parallelism %s " + + "--schemaFilePath %s --format %s --sparkMemory %s --retry %s --sparkMaster %s", + sourcePath.toString(), targetPath.toString(), tableName, HoodieTableType.COPY_ON_WRITE.name(), + "_row_key", "timestamp", "1", schemaFile, "parquet", "2G", "1", "local"); + CommandResult cr = getShell().executeCommand(command); + + assertAll("Command run success", + () -> assertTrue(cr.isSuccess()), + () -> assertEquals("Table imported to hoodie format", cr.getResult().toString())); + + // Check hudi table exist + String metaPath = targetPath + File.separator + HoodieTableMetaClient.METAFOLDER_NAME; + assertTrue(Files.exists(Paths.get(metaPath)), "Hoodie table not exist."); + + // Load meta data + new TableCommand().connect(targetPath.toString(), TimelineLayoutVersion.VERSION_1, false, 2000, 300000, 7); + metaClient = HoodieCLI.getTableMetaClient(); + + assertEquals(1, metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should only 1 commit."); + + verifyResultData(insertData); + } + + /** + * Test case for 'hdfsparquetimport' with upsert. + */ + @Test + public void testConvertWithUpsert() throws IOException, ParseException { + Path upsertFolder = new Path(basePath, "testUpsertSrc"); + List<GenericRecord> upsertData = importer.createUpsertRecords(upsertFolder); + + // first insert records + HDFSParquetImporter.Config cfg = importer.getHDFSParquetImporterConfig(sourcePath.toString(), tablePath, + tableName, HoodieTableType.COPY_ON_WRITE.name(), "_row_key", "timestamp", 1, schemaFile); + HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg); + + dataImporter.dataImport(jsc, 0); + + // Load meta data + new TableCommand().connect(targetPath.toString(), TimelineLayoutVersion.VERSION_1, false, 2000, 300000, 7); + metaClient = HoodieCLI.getTableMetaClient(); + + // check if insert instant exist + assertEquals(1, metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should only 1 commit."); + + String command = String.format("hdfsparquetimport --srcPath %s --targetPath %s --tableName %s " + + "--tableType %s --rowKeyField %s" + " --partitionPathField %s --parallelism %s " + + "--schemaFilePath %s --format %s --sparkMemory %s --retry %s --sparkMaster %s --upsert %s", + upsertFolder.toString(), targetPath.toString(), tableName, HoodieTableType.COPY_ON_WRITE.name(), + "_row_key", "timestamp", "1", schemaFile, "parquet", "2G", "1", "local", "true"); + CommandResult cr = getShell().executeCommand(command); + + assertAll("Command run success", + () -> assertTrue(cr.isSuccess()), + () -> assertEquals("Table imported to hoodie format", cr.getResult().toString())); + + // reload meta client + metaClient = HoodieTableMetaClient.reload(metaClient); + assertEquals(2, metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should have 2 commit."); + + // construct result, remove top 10 and add upsert data. + List<GenericRecord> expectData = insertData.subList(11, 96); + expectData.addAll(upsertData); + + verifyResultData(expectData); + } + + /** + * Method to verify result is equals to expect. + */ + private void verifyResultData(List<GenericRecord> expectData) { + Dataset<Row> ds = HoodieClientTestUtils.read(jsc, tablePath, sqlContext, fs, tablePath + "/*/*/*/*"); + + List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList(); + List<HoodieTripModel> result = readData.stream().map(row -> + new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4), + row.getDouble(5), row.getDouble(6), row.getDouble(7))) + .collect(Collectors.toList()); + + List<HoodieTripModel> expected = expectData.stream().map(g -> + new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()), + g.get("_row_key").toString(), + g.get("rider").toString(), + g.get("driver").toString(), + Double.parseDouble(g.get("begin_lat").toString()), + Double.parseDouble(g.get("begin_lon").toString()), + Double.parseDouble(g.get("end_lat").toString()), + Double.parseDouble(g.get("end_lon").toString()))) + .collect(Collectors.toList()); + + assertAll("Result list equals", + () -> assertEquals(expected.size(), result.size()), + () -> assertTrue(result.containsAll(expected) && expected.containsAll(result))); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java index f738cdf..a8474f6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java @@ -264,7 +264,7 @@ public class TestHDFSParquetImporter implements Serializable { } } - private List<GenericRecord> createInsertRecords(Path srcFolder) throws ParseException, IOException { + public List<GenericRecord> createInsertRecords(Path srcFolder) throws ParseException, IOException { Path srcFile = new Path(srcFolder.toString(), "file1.parquet"); long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000; List<GenericRecord> records = new ArrayList<GenericRecord>(); @@ -281,7 +281,7 @@ public class TestHDFSParquetImporter implements Serializable { return records; } - private List<GenericRecord> createUpsertRecords(Path srcFolder) throws ParseException, IOException { + public List<GenericRecord> createUpsertRecords(Path srcFolder) throws ParseException, IOException { Path srcFile = new Path(srcFolder.toString(), "file1.parquet"); long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000; List<GenericRecord> records = new ArrayList<GenericRecord>(); @@ -361,7 +361,7 @@ public class TestHDFSParquetImporter implements Serializable { } } - private HDFSParquetImporter.Config getHDFSParquetImporterConfig(String srcPath, String targetPath, String tableName, + public HDFSParquetImporter.Config getHDFSParquetImporterConfig(String srcPath, String targetPath, String tableName, String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile) { HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config(); cfg.srcPath = srcPath; @@ -395,7 +395,7 @@ public class TestHDFSParquetImporter implements Serializable { double endLat; double endLon; - private HoodieTripModel(double timestamp, String rowKey, String rider, String driver, double beginLat, + public HoodieTripModel(double timestamp, String rowKey, String rider, String driver, double beginLat, double beginLon, double endLat, double endLon) { this.timestamp = timestamp; this.rowKey = rowKey;