This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3a2fe13 [HUDI-701] Add unit test for HDFSParquetImportCommand (#1574)
3a2fe13 is described below
commit 3a2fe13fcb7c168f8ff023e3bdb6ae482b400316
Author: hongdd <[email protected]>
AuthorDate: Thu May 14 19:15:49 2020 +0800
[HUDI-701] Add unit test for HDFSParquetImportCommand (#1574)
---
hudi-cli/pom.xml | 7 +
.../cli/commands/HDFSParquetImportCommand.java | 8 +-
.../org/apache/hudi/cli/commands/SparkMain.java | 19 +--
.../cli/integ/ITTestHDFSParquetImportCommand.java | 186 +++++++++++++++++++++
.../functional/TestHDFSParquetImporter.java | 8 +-
5 files changed, 209 insertions(+), 19 deletions(-)
diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml
index fed2bf9..dbb4463 100644
--- a/hudi-cli/pom.xml
+++ b/hudi-cli/pom.xml
@@ -194,6 +194,13 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-utilities_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
<!-- Logging -->
<dependency>
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
index 0f1db50..a31f310 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
@@ -18,7 +18,6 @@
package org.apache.hudi.cli.commands;
-import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
@@ -57,6 +56,7 @@ public class HDFSParquetImportCommand implements
CommandMarker {
@CliOption(key = "schemaFilePath", mandatory = true,
help = "Path for Avro schema file") final String schemaFilePath,
@CliOption(key = "format", mandatory = true, help = "Format for the
input data") final String format,
+ @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help =
"Spark Master") String master,
@CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor
memory") final String sparkMemory,
@CliOption(key = "retry", mandatory = true, help = "Number of retries")
final String retry,
@CliOption(key = "propsFilePath", help = "path to properties file on
localfs or dfs with configurations for hoodie client for importing",
@@ -66,8 +66,6 @@ public class HDFSParquetImportCommand implements
CommandMarker {
(new FormatValidator()).validate("format", format);
- boolean initialized = HoodieCLI.initConf();
- HoodieCLI.initFS(initialized);
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
@@ -78,8 +76,8 @@ public class HDFSParquetImportCommand implements
CommandMarker {
cmd = SparkCommand.UPSERT.toString();
}
- sparkLauncher.addAppArgs(cmd, srcPath, targetPath, tableName, tableType,
rowKeyField, partitionPathField,
- parallelism, schemaFilePath, sparkMemory, retry, propsFilePath);
+ sparkLauncher.addAppArgs(cmd, master, sparkMemory, srcPath, targetPath,
tableName, tableType, rowKeyField,
+ partitionPathField, parallelism, schemaFilePath, retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 5d8972d..be9d7dd 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -82,17 +82,17 @@ public class SparkMain {
break;
case IMPORT:
case UPSERT:
- assert (args.length >= 12);
+ assert (args.length >= 13);
String propsFilePath = null;
- if (!StringUtils.isNullOrEmpty(args[11])) {
- propsFilePath = args[11];
+ if (!StringUtils.isNullOrEmpty(args[12])) {
+ propsFilePath = args[12];
}
List<String> configs = new ArrayList<>();
- if (args.length > 12) {
- configs.addAll(Arrays.asList(args).subList(12, args.length));
+ if (args.length > 13) {
+ configs.addAll(Arrays.asList(args).subList(13, args.length));
}
- returnCode = dataLoad(jsc, command, args[1], args[2], args[3],
args[4], args[5], args[6],
- Integer.parseInt(args[7]), args[8], args[9],
Integer.parseInt(args[10]), propsFilePath, configs);
+ returnCode = dataLoad(jsc, command, args[3], args[4], args[5],
args[6], args[7], args[8],
+ Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]),
propsFilePath, configs);
break;
case COMPACT_RUN:
assert (args.length >= 9);
@@ -163,7 +163,7 @@ public class SparkMain {
private static boolean sparkMasterContained(SparkCommand command) {
List<SparkCommand> masterContained =
Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR,
SparkCommand.COMPACT_UNSCHEDULE_PLAN,
SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN,
- SparkCommand.DEDUPLICATE);
+ SparkCommand.IMPORT, SparkCommand.UPSERT, SparkCommand.DEDUPLICATE);
return masterContained.contains(command);
}
@@ -177,7 +177,7 @@ public class SparkMain {
}
private static int dataLoad(JavaSparkContext jsc, String command, String
srcPath, String targetPath, String tableName,
- String tableType, String rowKey, String partitionKey, int parallelism,
String schemaFile, String sparkMemory,
+ String tableType, String rowKey, String partitionKey, int parallelism,
String schemaFile,
int retry, String propsFilePath, List<String> configs) {
Config cfg = new Config();
cfg.command = command;
@@ -191,7 +191,6 @@ public class SparkMain {
cfg.schemaFile = schemaFile;
cfg.propsFilePath = propsFilePath;
cfg.configs = configs;
- jsc.getConf().set("spark.executor.memory", sparkMemory);
return new HDFSParquetImporter(cfg).dataImport(jsc, retry);
}
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java
new file mode 100644
index 0000000..347396b
--- /dev/null
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.utilities.HDFSParquetImporter;
+import org.apache.hudi.utilities.functional.TestHDFSParquetImporter;
+import
org.apache.hudi.utilities.functional.TestHDFSParquetImporter.HoodieTripModel;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.ParseException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for {@link
org.apache.hudi.cli.commands.HDFSParquetImportCommand}.
+ */
+public class ITTestHDFSParquetImportCommand extends
AbstractShellIntegrationTest {
+
+ private Path sourcePath;
+ private Path targetPath;
+ private String tableName;
+ private String schemaFile;
+ private String tablePath;
+
+ private List<GenericRecord> insertData;
+ private TestHDFSParquetImporter importer;
+
+ @BeforeEach
+ public void init() throws IOException, ParseException {
+ tableName = "test_table";
+ tablePath = basePath + File.separator + tableName;
+ sourcePath = new Path(basePath, "source");
+ targetPath = new Path(tablePath);
+ schemaFile = new Path(basePath, "file.schema").toString();
+
+ // create schema file
+ try (FSDataOutputStream schemaFileOS = fs.create(new Path(schemaFile))) {
+
schemaFileOS.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA.getBytes());
+ }
+
+ importer = new TestHDFSParquetImporter();
+ insertData = importer.createInsertRecords(sourcePath);
+ }
+
+ /**
+ * Test case for 'hdfsparquetimport' with insert.
+ */
+ @Test
+ public void testConvertWithInsert() throws IOException {
+ String command = String.format("hdfsparquetimport --srcPath %s
--targetPath %s --tableName %s "
+ + "--tableType %s --rowKeyField %s" + " --partitionPathField %s
--parallelism %s "
+ + "--schemaFilePath %s --format %s --sparkMemory %s --retry %s
--sparkMaster %s",
+ sourcePath.toString(), targetPath.toString(), tableName,
HoodieTableType.COPY_ON_WRITE.name(),
+ "_row_key", "timestamp", "1", schemaFile, "parquet", "2G", "1",
"local");
+ CommandResult cr = getShell().executeCommand(command);
+
+ assertAll("Command run success",
+ () -> assertTrue(cr.isSuccess()),
+ () -> assertEquals("Table imported to hoodie format",
cr.getResult().toString()));
+
+ // Check hudi table exist
+ String metaPath = targetPath + File.separator +
HoodieTableMetaClient.METAFOLDER_NAME;
+ assertTrue(Files.exists(Paths.get(metaPath)), "Hoodie table not exist.");
+
+ // Load meta data
+ new TableCommand().connect(targetPath.toString(),
TimelineLayoutVersion.VERSION_1, false, 2000, 300000, 7);
+ metaClient = HoodieCLI.getTableMetaClient();
+
+ assertEquals(1,
metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should
only 1 commit.");
+
+ verifyResultData(insertData);
+ }
+
+ /**
+ * Test case for 'hdfsparquetimport' with upsert.
+ */
+ @Test
+ public void testConvertWithUpsert() throws IOException, ParseException {
+ Path upsertFolder = new Path(basePath, "testUpsertSrc");
+ List<GenericRecord> upsertData =
importer.createUpsertRecords(upsertFolder);
+
+ // first insert records
+ HDFSParquetImporter.Config cfg =
importer.getHDFSParquetImporterConfig(sourcePath.toString(), tablePath,
+ tableName, HoodieTableType.COPY_ON_WRITE.name(), "_row_key",
"timestamp", 1, schemaFile);
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+
+ // Load meta data
+ new TableCommand().connect(targetPath.toString(),
TimelineLayoutVersion.VERSION_1, false, 2000, 300000, 7);
+ metaClient = HoodieCLI.getTableMetaClient();
+
+ // check if insert instant exist
+ assertEquals(1,
metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should
only 1 commit.");
+
+ String command = String.format("hdfsparquetimport --srcPath %s
--targetPath %s --tableName %s "
+ + "--tableType %s --rowKeyField %s" + " --partitionPathField %s
--parallelism %s "
+ + "--schemaFilePath %s --format %s --sparkMemory %s --retry %s
--sparkMaster %s --upsert %s",
+ upsertFolder.toString(), targetPath.toString(), tableName,
HoodieTableType.COPY_ON_WRITE.name(),
+ "_row_key", "timestamp", "1", schemaFile, "parquet", "2G", "1",
"local", "true");
+ CommandResult cr = getShell().executeCommand(command);
+
+ assertAll("Command run success",
+ () -> assertTrue(cr.isSuccess()),
+ () -> assertEquals("Table imported to hoodie format",
cr.getResult().toString()));
+
+ // reload meta client
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ assertEquals(2,
metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should
have 2 commit.");
+
+ // construct result, remove top 10 and add upsert data.
+ List<GenericRecord> expectData = insertData.subList(11, 96);
+ expectData.addAll(upsertData);
+
+ verifyResultData(expectData);
+ }
+
+ /**
+ * Method to verify result is equals to expect.
+ */
+ private void verifyResultData(List<GenericRecord> expectData) {
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, tablePath, sqlContext,
fs, tablePath + "/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver",
"begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieTripModel> result = readData.stream().map(row ->
+ new HoodieTripModel(row.getDouble(0), row.getString(1),
row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ List<HoodieTripModel> expected = expectData.stream().map(g ->
+ new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()),
+ g.get("_row_key").toString(),
+ g.get("rider").toString(),
+ g.get("driver").toString(),
+ Double.parseDouble(g.get("begin_lat").toString()),
+ Double.parseDouble(g.get("begin_lon").toString()),
+ Double.parseDouble(g.get("end_lat").toString()),
+ Double.parseDouble(g.get("end_lon").toString())))
+ .collect(Collectors.toList());
+
+ assertAll("Result list equals",
+ () -> assertEquals(expected.size(), result.size()),
+ () -> assertTrue(result.containsAll(expected) &&
expected.containsAll(result)));
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
index f738cdf..a8474f6 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
@@ -264,7 +264,7 @@ public class TestHDFSParquetImporter implements
Serializable {
}
}
- private List<GenericRecord> createInsertRecords(Path srcFolder) throws
ParseException, IOException {
+ public List<GenericRecord> createInsertRecords(Path srcFolder) throws
ParseException, IOException {
Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
long startTime =
HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
List<GenericRecord> records = new ArrayList<GenericRecord>();
@@ -281,7 +281,7 @@ public class TestHDFSParquetImporter implements
Serializable {
return records;
}
- private List<GenericRecord> createUpsertRecords(Path srcFolder) throws
ParseException, IOException {
+ public List<GenericRecord> createUpsertRecords(Path srcFolder) throws
ParseException, IOException {
Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
long startTime =
HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
List<GenericRecord> records = new ArrayList<GenericRecord>();
@@ -361,7 +361,7 @@ public class TestHDFSParquetImporter implements
Serializable {
}
}
- private HDFSParquetImporter.Config getHDFSParquetImporterConfig(String
srcPath, String targetPath, String tableName,
+ public HDFSParquetImporter.Config getHDFSParquetImporterConfig(String
srcPath, String targetPath, String tableName,
String tableType, String rowKey, String partitionKey, int parallelism,
String schemaFile) {
HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config();
cfg.srcPath = srcPath;
@@ -395,7 +395,7 @@ public class TestHDFSParquetImporter implements
Serializable {
double endLat;
double endLon;
- private HoodieTripModel(double timestamp, String rowKey, String rider,
String driver, double beginLat,
+ public HoodieTripModel(double timestamp, String rowKey, String rider,
String driver, double beginLat,
double beginLon, double endLat, double endLon) {
this.timestamp = timestamp;
this.rowKey = rowKey;