This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a2fe13  [HUDI-701] Add unit test for HDFSParquetImportCommand (#1574)
3a2fe13 is described below

commit 3a2fe13fcb7c168f8ff023e3bdb6ae482b400316
Author: hongdd <jn_...@163.com>
AuthorDate: Thu May 14 19:15:49 2020 +0800

    [HUDI-701] Add unit test for HDFSParquetImportCommand (#1574)
---
 hudi-cli/pom.xml                                   |   7 +
 .../cli/commands/HDFSParquetImportCommand.java     |   8 +-
 .../org/apache/hudi/cli/commands/SparkMain.java    |  19 +--
 .../cli/integ/ITTestHDFSParquetImportCommand.java  | 186 +++++++++++++++++++++
 .../functional/TestHDFSParquetImporter.java        |   8 +-
 5 files changed, 209 insertions(+), 19 deletions(-)

diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml
index fed2bf9..dbb4463 100644
--- a/hudi-cli/pom.xml
+++ b/hudi-cli/pom.xml
@@ -194,6 +194,13 @@
       <scope>test</scope>
       <type>test-jar</type>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-utilities_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
 
     <!-- Logging -->
     <dependency>
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
index 0f1db50..a31f310 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.cli.commands;
 
-import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
 import org.apache.hudi.cli.utils.InputStreamConsumer;
 import org.apache.hudi.cli.utils.SparkUtil;
@@ -57,6 +56,7 @@ public class HDFSParquetImportCommand implements 
CommandMarker {
       @CliOption(key = "schemaFilePath", mandatory = true,
           help = "Path for Avro schema file") final String schemaFilePath,
       @CliOption(key = "format", mandatory = true, help = "Format for the 
input data") final String format,
+      @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = 
"Spark Master") String master,
       @CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor 
memory") final String sparkMemory,
       @CliOption(key = "retry", mandatory = true, help = "Number of retries") 
final String retry,
       @CliOption(key = "propsFilePath", help = "path to properties file on 
localfs or dfs with configurations for hoodie client for importing",
@@ -66,8 +66,6 @@ public class HDFSParquetImportCommand implements 
CommandMarker {
 
     (new FormatValidator()).validate("format", format);
 
-    boolean initialized = HoodieCLI.initConf();
-    HoodieCLI.initFS(initialized);
     String sparkPropertiesPath =
         
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
 
@@ -78,8 +76,8 @@ public class HDFSParquetImportCommand implements 
CommandMarker {
       cmd = SparkCommand.UPSERT.toString();
     }
 
-    sparkLauncher.addAppArgs(cmd, srcPath, targetPath, tableName, tableType, 
rowKeyField, partitionPathField,
-        parallelism, schemaFilePath, sparkMemory, retry, propsFilePath);
+    sparkLauncher.addAppArgs(cmd, master, sparkMemory, srcPath, targetPath, 
tableName, tableType, rowKeyField,
+        partitionPathField, parallelism, schemaFilePath, retry, propsFilePath);
     UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
     Process process = sparkLauncher.launch();
     InputStreamConsumer.captureOutput(process);
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 5d8972d..be9d7dd 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -82,17 +82,17 @@ public class SparkMain {
         break;
       case IMPORT:
       case UPSERT:
-        assert (args.length >= 12);
+        assert (args.length >= 13);
         String propsFilePath = null;
-        if (!StringUtils.isNullOrEmpty(args[11])) {
-          propsFilePath = args[11];
+        if (!StringUtils.isNullOrEmpty(args[12])) {
+          propsFilePath = args[12];
         }
         List<String> configs = new ArrayList<>();
-        if (args.length > 12) {
-          configs.addAll(Arrays.asList(args).subList(12, args.length));
+        if (args.length > 13) {
+          configs.addAll(Arrays.asList(args).subList(13, args.length));
         }
-        returnCode = dataLoad(jsc, command, args[1], args[2], args[3], 
args[4], args[5], args[6],
-            Integer.parseInt(args[7]), args[8], args[9], 
Integer.parseInt(args[10]), propsFilePath, configs);
+        returnCode = dataLoad(jsc, command, args[3], args[4], args[5], 
args[6], args[7], args[8],
+            Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), 
propsFilePath, configs);
         break;
       case COMPACT_RUN:
         assert (args.length >= 9);
@@ -163,7 +163,7 @@ public class SparkMain {
   private static boolean sparkMasterContained(SparkCommand command) {
     List<SparkCommand> masterContained = 
Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR,
         SparkCommand.COMPACT_UNSCHEDULE_PLAN, 
SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN,
-        SparkCommand.DEDUPLICATE);
+        SparkCommand.IMPORT, SparkCommand.UPSERT, SparkCommand.DEDUPLICATE);
     return masterContained.contains(command);
   }
 
@@ -177,7 +177,7 @@ public class SparkMain {
   }
 
   private static int dataLoad(JavaSparkContext jsc, String command, String 
srcPath, String targetPath, String tableName,
-      String tableType, String rowKey, String partitionKey, int parallelism, 
String schemaFile, String sparkMemory,
+      String tableType, String rowKey, String partitionKey, int parallelism, 
String schemaFile,
       int retry, String propsFilePath, List<String> configs) {
     Config cfg = new Config();
     cfg.command = command;
@@ -191,7 +191,6 @@ public class SparkMain {
     cfg.schemaFile = schemaFile;
     cfg.propsFilePath = propsFilePath;
     cfg.configs = configs;
-    jsc.getConf().set("spark.executor.memory", sparkMemory);
     return new HDFSParquetImporter(cfg).dataImport(jsc, retry);
   }
 
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java
new file mode 100644
index 0000000..347396b
--- /dev/null
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.utilities.HDFSParquetImporter;
+import org.apache.hudi.utilities.functional.TestHDFSParquetImporter;
+import 
org.apache.hudi.utilities.functional.TestHDFSParquetImporter.HoodieTripModel;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.ParseException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for {@link 
org.apache.hudi.cli.commands.HDFSParquetImportCommand}.
+ */
+public class ITTestHDFSParquetImportCommand extends 
AbstractShellIntegrationTest {
+
+  private Path sourcePath;
+  private Path targetPath;
+  private String tableName;
+  private String schemaFile;
+  private String tablePath;
+
+  private List<GenericRecord> insertData;
+  private TestHDFSParquetImporter importer;
+
+  @BeforeEach
+  public void init() throws IOException, ParseException {
+    tableName = "test_table";
+    tablePath = basePath + File.separator + tableName;
+    sourcePath = new Path(basePath, "source");
+    targetPath = new Path(tablePath);
+    schemaFile = new Path(basePath, "file.schema").toString();
+
+    // create schema file
+    try (FSDataOutputStream schemaFileOS = fs.create(new Path(schemaFile))) {
+      
schemaFileOS.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA.getBytes());
+    }
+
+    importer = new TestHDFSParquetImporter();
+    insertData = importer.createInsertRecords(sourcePath);
+  }
+
+  /**
+   * Test case for 'hdfsparquetimport' with insert.
+   */
+  @Test
+  public void testConvertWithInsert() throws IOException {
+    String command = String.format("hdfsparquetimport --srcPath %s 
--targetPath %s --tableName %s "
+        + "--tableType %s --rowKeyField %s" + " --partitionPathField %s 
--parallelism %s "
+        + "--schemaFilePath %s --format %s --sparkMemory %s --retry %s 
--sparkMaster %s",
+        sourcePath.toString(), targetPath.toString(), tableName, 
HoodieTableType.COPY_ON_WRITE.name(),
+        "_row_key", "timestamp", "1", schemaFile, "parquet", "2G", "1", 
"local");
+    CommandResult cr = getShell().executeCommand(command);
+
+    assertAll("Command run success",
+        () -> assertTrue(cr.isSuccess()),
+        () -> assertEquals("Table imported to hoodie format", 
cr.getResult().toString()));
+
+    // Check hudi table exist
+    String metaPath = targetPath + File.separator + 
HoodieTableMetaClient.METAFOLDER_NAME;
+    assertTrue(Files.exists(Paths.get(metaPath)), "Hoodie table not exist.");
+
+    // Load meta data
+    new TableCommand().connect(targetPath.toString(), 
TimelineLayoutVersion.VERSION_1, false, 2000, 300000, 7);
+    metaClient = HoodieCLI.getTableMetaClient();
+
+    assertEquals(1, 
metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should 
only 1 commit.");
+
+    verifyResultData(insertData);
+  }
+
+  /**
+   * Test case for 'hdfsparquetimport' with upsert.
+   */
+  @Test
+  public void testConvertWithUpsert() throws IOException, ParseException {
+    Path upsertFolder = new Path(basePath, "testUpsertSrc");
+    List<GenericRecord> upsertData = 
importer.createUpsertRecords(upsertFolder);
+
+    // first insert records
+    HDFSParquetImporter.Config cfg = 
importer.getHDFSParquetImporterConfig(sourcePath.toString(), tablePath,
+        tableName, HoodieTableType.COPY_ON_WRITE.name(), "_row_key", 
"timestamp", 1, schemaFile);
+    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+    dataImporter.dataImport(jsc, 0);
+
+    // Load meta data
+    new TableCommand().connect(targetPath.toString(), 
TimelineLayoutVersion.VERSION_1, false, 2000, 300000, 7);
+    metaClient = HoodieCLI.getTableMetaClient();
+
+    // check if insert instant exist
+    assertEquals(1, 
metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should 
only 1 commit.");
+
+    String command = String.format("hdfsparquetimport --srcPath %s 
--targetPath %s --tableName %s "
+        + "--tableType %s --rowKeyField %s" + " --partitionPathField %s 
--parallelism %s "
+        + "--schemaFilePath %s --format %s --sparkMemory %s --retry %s 
--sparkMaster %s --upsert %s",
+        upsertFolder.toString(), targetPath.toString(), tableName, 
HoodieTableType.COPY_ON_WRITE.name(),
+        "_row_key", "timestamp", "1", schemaFile, "parquet", "2G", "1", 
"local", "true");
+    CommandResult cr = getShell().executeCommand(command);
+
+    assertAll("Command run success",
+        () -> assertTrue(cr.isSuccess()),
+        () -> assertEquals("Table imported to hoodie format", 
cr.getResult().toString()));
+
+    // reload meta client
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    assertEquals(2, 
metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should 
have 2 commit.");
+
+    // construct result, remove top 10 and add upsert data.
+    List<GenericRecord> expectData = insertData.subList(11, 96);
+    expectData.addAll(upsertData);
+
+    verifyResultData(expectData);
+  }
+
+  /**
+   * Method to verify result is equals to expect.
+   */
+  private void verifyResultData(List<GenericRecord> expectData) {
+    Dataset<Row> ds = HoodieClientTestUtils.read(jsc, tablePath, sqlContext, 
fs, tablePath + "/*/*/*/*");
+
+    List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", 
"begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+    List<HoodieTripModel> result = readData.stream().map(row ->
+        new HoodieTripModel(row.getDouble(0), row.getString(1), 
row.getString(2), row.getString(3), row.getDouble(4),
+            row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+        .collect(Collectors.toList());
+
+    List<HoodieTripModel> expected = expectData.stream().map(g ->
+        new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()),
+            g.get("_row_key").toString(),
+            g.get("rider").toString(),
+            g.get("driver").toString(),
+            Double.parseDouble(g.get("begin_lat").toString()),
+            Double.parseDouble(g.get("begin_lon").toString()),
+            Double.parseDouble(g.get("end_lat").toString()),
+            Double.parseDouble(g.get("end_lon").toString())))
+        .collect(Collectors.toList());
+
+    assertAll("Result list equals",
+        () -> assertEquals(expected.size(), result.size()),
+        () -> assertTrue(result.containsAll(expected) && 
expected.containsAll(result)));
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
index f738cdf..a8474f6 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
@@ -264,7 +264,7 @@ public class TestHDFSParquetImporter implements 
Serializable {
     }
   }
 
-  private List<GenericRecord> createInsertRecords(Path srcFolder) throws 
ParseException, IOException {
+  public List<GenericRecord> createInsertRecords(Path srcFolder) throws 
ParseException, IOException {
     Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
     long startTime = 
HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
     List<GenericRecord> records = new ArrayList<GenericRecord>();
@@ -281,7 +281,7 @@ public class TestHDFSParquetImporter implements 
Serializable {
     return records;
   }
 
-  private List<GenericRecord> createUpsertRecords(Path srcFolder) throws 
ParseException, IOException {
+  public List<GenericRecord> createUpsertRecords(Path srcFolder) throws 
ParseException, IOException {
     Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
     long startTime = 
HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
     List<GenericRecord> records = new ArrayList<GenericRecord>();
@@ -361,7 +361,7 @@ public class TestHDFSParquetImporter implements 
Serializable {
     }
   }
 
-  private HDFSParquetImporter.Config getHDFSParquetImporterConfig(String 
srcPath, String targetPath, String tableName,
+  public HDFSParquetImporter.Config getHDFSParquetImporterConfig(String 
srcPath, String targetPath, String tableName,
       String tableType, String rowKey, String partitionKey, int parallelism, 
String schemaFile) {
     HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config();
     cfg.srcPath = srcPath;
@@ -395,7 +395,7 @@ public class TestHDFSParquetImporter implements 
Serializable {
     double endLat;
     double endLon;
 
-    private HoodieTripModel(double timestamp, String rowKey, String rider, 
String driver, double beginLat,
+    public HoodieTripModel(double timestamp, String rowKey, String rider, 
String driver, double beginLat,
         double beginLon, double endLat, double endLon) {
       this.timestamp = timestamp;
       this.rowKey = rowKey;

Reply via email to