This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new aeebfcfcec2 [HUDI-8697] Revisit TestHDFSParquetImporter and 
TestHoodieSnapshotCopier (#12695)
aeebfcfcec2 is described below

commit aeebfcfcec271e8ff8f37e7f7ef2418d386f4c76
Author: Vova Kolmakov <[email protected]>
AuthorDate: Fri Mar 7 08:26:21 2025 +0700

    [HUDI-8697] Revisit TestHDFSParquetImporter and TestHoodieSnapshotCopier 
(#12695)
    
    Co-authored-by: Vova Kolmakov <[email protected]>
    Co-authored-by: Vova Kolmakov <[email protected]>
---
 .../cli/commands/HDFSParquetImportCommand.java     |  93 -----
 .../org/apache/hudi/cli/commands/SparkMain.java    |  25 +-
 .../cli/integ/ITTestHDFSParquetImportCommand.java  | 200 -----------
 .../apache/hudi/utilities/HDFSParquetImporter.java | 301 ----------------
 .../hudi/utilities/HoodieSnapshotCopier.java       | 210 -----------
 .../functional/TestHDFSParquetImporter.java        | 394 ---------------------
 .../functional/TestHoodieSnapshotCopier.java       | 151 --------
 7 files changed, 2 insertions(+), 1372 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
deleted file mode 100644
index dafd15b174a..00000000000
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.cli.commands;
-
-import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
-import org.apache.hudi.cli.utils.InputStreamConsumer;
-import org.apache.hudi.cli.utils.SparkUtil;
-import org.apache.hudi.utilities.HDFSParquetImporter.FormatValidator;
-import org.apache.hudi.utilities.UtilHelpers;
-import org.apache.hudi.utilities.streamer.HoodieStreamer;
-
-import org.apache.spark.launcher.SparkLauncher;
-import org.apache.spark.util.Utils;
-import org.springframework.shell.standard.ShellComponent;
-import org.springframework.shell.standard.ShellMethod;
-import org.springframework.shell.standard.ShellOption;
-
-import scala.collection.JavaConverters;
-
-/**
- * CLI command for importing parquet table to hudi table.
- *
- * @see HoodieStreamer
- * @deprecated This utility is deprecated in 0.10.0 and will be removed in 
0.11.0. Use {@link HoodieStreamer.Config#runBootstrap} instead.
- */
-@ShellComponent
-public class HDFSParquetImportCommand {
-
-  @ShellMethod(key = "hdfsparquetimport", value = "Imports Parquet table to a 
hoodie table")
-  public String convert(
-      @ShellOption(value = "--upsert", defaultValue = "false",
-          help = "Uses upsert API instead of the default insert API of 
WriteClient") boolean useUpsert,
-      @ShellOption(value = "--srcPath", help = "Base path for the input 
table") final String srcPath,
-      @ShellOption(value = "--targetPath",
-          help = "Base path for the target hoodie table") final String 
targetPath,
-      @ShellOption(value = "--tableName", help = "Table name") final String 
tableName,
-      @ShellOption(value = "--tableType", help = "Table type") final String 
tableType,
-      @ShellOption(value = "--rowKeyField", help = "Row key field name") final 
String rowKeyField,
-      @ShellOption(value = "--partitionPathField", defaultValue = "",
-          help = "Partition path field name") final String partitionPathField,
-      @ShellOption(value = {"--parallelism"},
-          help = "Parallelism for hoodie insert") final String parallelism,
-      @ShellOption(value = "--schemaFilePath",
-          help = "Path for Avro schema file") final String schemaFilePath,
-      @ShellOption(value = "--format", help = "Format for the input data") 
final String format,
-      @ShellOption(value = "--sparkMaster", defaultValue = "", help = "Spark 
Master") String master,
-      @ShellOption(value = "--sparkMemory", help = "Spark executor memory") 
final String sparkMemory,
-      @ShellOption(value = "--retry", help = "Number of retries") final String 
retry,
-      @ShellOption(value = "--propsFilePath", help = "path to properties file 
on localfs or dfs with configurations for hoodie client for importing",
-          defaultValue = "") final String propsFilePath,
-      @ShellOption(value = "--hoodieConfigs", help = "Any configuration that 
can be set in the properties file can be passed here in the form of an array",
-          defaultValue = "") final String[] configs) throws Exception {
-
-    (new FormatValidator()).validate("format", format);
-
-    String sparkPropertiesPath =
-        
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
-
-    SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
-
-    SparkCommand cmd = SparkCommand.IMPORT;
-    if (useUpsert) {
-      cmd = SparkCommand.UPSERT;
-    }
-
-    SparkMain.addAppArgs(sparkLauncher, cmd, master, sparkMemory, srcPath, 
targetPath, tableName, tableType, rowKeyField,
-        partitionPathField, parallelism, schemaFilePath, retry, propsFilePath);
-    UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
-    Process process = sparkLauncher.launch();
-    InputStreamConsumer.captureOutput(process);
-    int exitCode = process.waitFor();
-    if (exitCode != 0) {
-      return "Failed to import table to hoodie format";
-    }
-    return "Table imported to hoodie format";
-  }
-}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index d752f9f27b9..5448f155213 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -40,6 +40,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.index.HoodieIndex;
@@ -50,8 +51,6 @@ import 
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
 import org.apache.hudi.table.upgrade.UpgradeDowngrade;
-import org.apache.hudi.utilities.HDFSParquetImporter;
-import org.apache.hudi.utilities.HDFSParquetImporter.Config;
 import org.apache.hudi.utilities.HoodieCleaner;
 import org.apache.hudi.utilities.HoodieClusteringJob;
 import org.apache.hudi.utilities.HoodieCompactionAdminTool;
@@ -171,9 +170,7 @@ public class SparkMain {
           break;
         case IMPORT:
         case UPSERT:
-          returnCode = dataLoad(jsc, commandString, args[3], args[4], args[5], 
args[6], args[7], args[8],
-              Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), 
propsFilePath, configs);
-          break;
+          throw new HoodieNotSupportedException("This command is no longer 
supported. Use HoodieStreamer utility instead.");
         case COMPACT_RUN:
           returnCode = compact(jsc, args[3], args[4], args[5], 
Integer.parseInt(args[6]), args[7],
               Integer.parseInt(args[8]), HoodieCompactor.EXECUTE, 
propsFilePath, configs);
@@ -290,24 +287,6 @@ public class SparkMain {
     }
   }
 
-  private static int dataLoad(JavaSparkContext jsc, String command, String 
srcPath, String targetPath, String tableName,
-                              String tableType, String rowKey, String 
partitionKey, int parallelism, String schemaFile,
-                              int retry, String propsFilePath, List<String> 
configs) {
-    Config cfg = new Config();
-    cfg.command = command;
-    cfg.srcPath = srcPath;
-    cfg.targetPath = targetPath;
-    cfg.tableName = tableName;
-    cfg.tableType = tableType;
-    cfg.rowKey = rowKey;
-    cfg.partitionKey = partitionKey;
-    cfg.parallelism = parallelism;
-    cfg.schemaFile = schemaFile;
-    cfg.propsFilePath = propsFilePath;
-    cfg.configs = configs;
-    return new HDFSParquetImporter(cfg).dataImport(jsc, retry);
-  }
-
   private static void doCompactValidate(JavaSparkContext jsc, String basePath, 
String compactionInstant,
                                         String outputPath, int parallelism) 
throws Exception {
     HoodieCompactionAdminTool.Config cfg = new 
HoodieCompactionAdminTool.Config();
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java
deleted file mode 100644
index 40d8cc04f58..00000000000
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.cli.integ;
-
-import org.apache.hudi.cli.HoodieCLI;
-import org.apache.hudi.cli.commands.TableCommand;
-import org.apache.hudi.cli.testutils.HoodieCLIIntegrationTestBase;
-import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
-import org.apache.hudi.utilities.HDFSParquetImporter;
-import org.apache.hudi.utilities.functional.TestHDFSParquetImporter;
-import 
org.apache.hudi.utilities.functional.TestHDFSParquetImporter.HoodieTripModel;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.shell.Shell;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.text.ParseException;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
-import static org.junit.jupiter.api.Assertions.assertAll;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Test class for {@link 
org.apache.hudi.cli.commands.HDFSParquetImportCommand}.
- */
-@Disabled("Disable due to flakiness and feature deprecation.")
-@SpringBootTest(properties = {"spring.shell.interactive.enabled=false", 
"spring.shell.command.script.enabled=false"})
-public class ITTestHDFSParquetImportCommand extends 
HoodieCLIIntegrationTestBase {
-
-  @Autowired
-  private Shell shell;
-  private Path sourcePath;
-  private Path targetPath;
-  private String tableName;
-  private String schemaFile;
-  private String tablePath;
-
-  private List<GenericRecord> insertData;
-  private TestHDFSParquetImporter importer;
-
-  @BeforeEach
-  public void init() throws IOException, ParseException {
-    tableName = "test_table";
-    tablePath = basePath + StoragePath.SEPARATOR + tableName;
-    sourcePath = new Path(basePath, "source");
-    targetPath = new Path(tablePath);
-    schemaFile = new StoragePath(basePath, "file.schema").toString();
-
-    // create schema file
-    try (OutputStream schemaFileOS = storage.create(new 
StoragePath(schemaFile))) {
-      
schemaFileOS.write(getUTF8Bytes(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
-    }
-
-    importer = new TestHDFSParquetImporter();
-    insertData = importer.createInsertRecords(sourcePath);
-  }
-
-  /**
-   * Test case for 'hdfsparquetimport' with insert.
-   */
-  @Test
-  public void testConvertWithInsert() throws IOException {
-    String command = String.format("hdfsparquetimport --srcPath %s 
--targetPath %s --tableName %s "
-        + "--tableType %s --rowKeyField %s" + " --partitionPathField %s 
--parallelism %s "
-        + "--schemaFilePath %s --format %s --sparkMemory %s --retry %s 
--sparkMaster %s",
-        sourcePath.toString(), targetPath.toString(), tableName, 
HoodieTableType.COPY_ON_WRITE.name(),
-        "_row_key", "timestamp", "1", schemaFile, "parquet", "2G", "1", 
"local");
-
-    Object result = shell.evaluate(() -> command);
-
-    assertAll("Command run success",
-        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
-        () -> assertEquals("Table imported to hoodie format", 
result.toString()));
-
-    // Check hudi table exist
-    String metaPath = targetPath + StoragePath.SEPARATOR + 
HoodieTableMetaClient.METAFOLDER_NAME;
-    assertTrue(Files.exists(Paths.get(metaPath)), "Hoodie table not exist.");
-
-    // Load meta data
-    new TableCommand().connect(targetPath.toString(), false, 2000, 300000, 7,
-        "WAIT_TO_ADJUST_SKEW", 200L, false);
-    metaClient = HoodieCLI.getTableMetaClient();
-
-    assertEquals(1, 
metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should 
only 1 commit.");
-
-    verifyResultData(insertData);
-  }
-
-  /**
-   * Test case for 'hdfsparquetimport' with upsert.
-   */
-  @Test
-  public void testConvertWithUpsert() throws IOException, ParseException {
-    Path upsertFolder = new Path(basePath, "testUpsertSrc");
-    List<GenericRecord> upsertData = 
importer.createUpsertRecords(upsertFolder);
-
-    // first insert records
-    HDFSParquetImporter.Config cfg = 
importer.getHDFSParquetImporterConfig(sourcePath.toString(), tablePath,
-        tableName, HoodieTableType.COPY_ON_WRITE.name(), "_row_key", 
"timestamp", 1, schemaFile);
-    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
-
-    dataImporter.dataImport(jsc, 0);
-
-    // Load meta data
-    new TableCommand().connect(targetPath.toString(), false, 2000, 300000, 7,
-        "WAIT_TO_ADJUST_SKEW", 200L, false);
-    metaClient = HoodieCLI.getTableMetaClient();
-
-    // check if insert instant exist
-    assertEquals(1, 
metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should 
only 1 commit.");
-
-    String command = String.format("hdfsparquetimport --srcPath %s 
--targetPath %s --tableName %s "
-        + "--tableType %s --rowKeyField %s" + " --partitionPathField %s 
--parallelism %s "
-        + "--schemaFilePath %s --format %s --sparkMemory %s --retry %s 
--sparkMaster %s --upsert %s",
-        upsertFolder.toString(), targetPath.toString(), tableName, 
HoodieTableType.COPY_ON_WRITE.name(),
-        "_row_key", "timestamp", "1", schemaFile, "parquet", "2G", "1", 
"local", "true");
-    Object result = shell.evaluate(() -> command);
-
-    assertAll("Command run success",
-        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
-        () -> assertEquals("Table imported to hoodie format", 
result.toString()));
-
-    // reload meta client
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    assertEquals(2, 
metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should 
have 2 commit.");
-
-    // construct result, remove top 10 and add upsert data.
-    List<GenericRecord> expectData = insertData.subList(11, 96);
-    expectData.addAll(upsertData);
-
-    verifyResultData(expectData);
-  }
-
-  /**
-   * Method to verify result is equals to expect.
-   */
-  private void verifyResultData(List<GenericRecord> expectData) {
-    Dataset<Row> ds = HoodieClientTestUtils.read(jsc, tablePath, sqlContext,
-        storage, tablePath + "/*/*/*/*");
-
-    List<Row> readData =
-        ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", 
"begin_lon", "end_lat",
-            "end_lon").collectAsList();
-    List<HoodieTripModel> result = readData.stream().map(row ->
-            new HoodieTripModel(row.getLong(0), row.getString(1), 
row.getString(2), row.getString(3),
-                row.getDouble(4),
-                row.getDouble(5), row.getDouble(6), row.getDouble(7)))
-        .collect(Collectors.toList());
-
-    List<HoodieTripModel> expected = expectData.stream().map(g ->
-            new HoodieTripModel(Long.parseLong(g.get("timestamp").toString()),
-                g.get("_row_key").toString(),
-            g.get("rider").toString(),
-            g.get("driver").toString(),
-            Double.parseDouble(g.get("begin_lat").toString()),
-            Double.parseDouble(g.get("begin_lon").toString()),
-            Double.parseDouble(g.get("end_lat").toString()),
-            Double.parseDouble(g.get("end_lon").toString())))
-        .collect(Collectors.toList());
-
-    assertAll("Result list equals",
-        () -> assertEquals(expected.size(), result.size()),
-        () -> assertTrue(result.containsAll(expected) && 
expected.containsAll(result)));
-  }
-}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
deleted file mode 100644
index 85af942007f..00000000000
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities;
-
-import org.apache.hudi.SparkAdapterSupport$;
-import org.apache.hudi.client.SparkRDDWriteClient;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.HoodieJsonPayload;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.HoodieTableVersion;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.utilities.streamer.HoodieStreamer;
-
-import com.beust.jcommander.IValueValidator;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.parquet.avro.AvroReadSupport;
-import org.apache.parquet.hadoop.ParquetInputFormat;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.time.Instant;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Loads data from Parquet Sources.
- *
- * @see HoodieStreamer
- * @deprecated This utility is deprecated in 0.10.0 and will be removed in 
0.11.0. Use {@link HoodieStreamer.Config#runBootstrap} instead.
- */
-public class HDFSParquetImporter implements Serializable {
-
-  private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(HDFSParquetImporter.class);
-
-  private static final DateTimeFormatter PARTITION_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy/MM/dd")
-      .withZone(ZoneId.systemDefault());
-  private final Config cfg;
-  private transient FileSystem fs;
-  /**
-   * Bag of properties with source, hoodie client, key generator etc.
-   */
-  private TypedProperties props;
-
-  public HDFSParquetImporter(Config cfg) {
-    this.cfg = cfg;
-  }
-
-  public static void main(String[] args) {
-    final Config cfg = new Config();
-    JCommander cmd = new JCommander(cfg, null, args);
-    if (cfg.help || args.length == 0) {
-      cmd.usage();
-      System.exit(1);
-    }
-    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
-    JavaSparkContext jssc =
-        UtilHelpers.buildSparkContext("data-importer-" + cfg.tableName, 
cfg.sparkMaster, cfg.sparkMemory);
-    int exitCode = 0;
-    try {
-      dataImporter.dataImport(jssc, cfg.retry);
-    } catch (Throwable throwable) {
-      exitCode = 1;
-      throw new HoodieException("Failed to run HoodieStreamer ", throwable);
-    } finally {
-      SparkAdapterSupport$.MODULE$.sparkAdapter().stopSparkContext(jssc, 
exitCode);
-    }
-
-  }
-
-  private boolean isUpsert() {
-    return "upsert".equalsIgnoreCase(cfg.command);
-  }
-
-  public int dataImport(JavaSparkContext jsc, int retry) {
-    this.fs = HadoopFSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
-    this.props = cfg.propsFilePath == null ? 
UtilHelpers.buildProperties(cfg.configs)
-        : UtilHelpers.readConfig(fs.getConf(), new Path(cfg.propsFilePath), 
cfg.configs).getProps(true);
-    LOG.info("Starting data import with configs : " + props.toString());
-    int ret = -1;
-    try {
-      // Verify that targetPath is not present.
-      if (fs.exists(new Path(cfg.targetPath)) && !isUpsert()) {
-        throw new HoodieIOException(String.format("Make sure %s is not 
present.", cfg.targetPath));
-      }
-      do {
-        ret = dataImport(jsc);
-      } while (ret != 0 && retry-- > 0);
-    } catch (Throwable t) {
-      LOG.error("Import data error", t);
-    }
-    return ret;
-  }
-
-  protected int dataImport(JavaSparkContext jsc) throws IOException {
-    try {
-      if (fs.exists(new Path(cfg.targetPath)) && !isUpsert()) {
-        // cleanup target directory.
-        fs.delete(new Path(cfg.targetPath), true);
-      }
-
-      if (!fs.exists(new Path(cfg.targetPath))) {
-        // Initialize target hoodie table.
-        HoodieTableMetaClient.newTableBuilder()
-            .setTableName(cfg.tableName)
-            .setTableType(cfg.tableType)
-            .setTableVersion(cfg.tableVersion)
-            
.initTable(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()), 
cfg.targetPath);
-      }
-
-      // Get schema.
-      String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
-
-      SparkRDDWriteClient<HoodieRecordPayload> client =
-          UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr, 
cfg.parallelism, Option.empty(), props);
-
-      JavaRDD<HoodieRecord<HoodieRecordPayload>> hoodieRecords = 
buildHoodieRecordsForImport(jsc, schemaStr);
-      // Get instant time.
-      String instantTime = client.startCommit();
-      JavaRDD<WriteStatus> writeResponse = load(client, instantTime, 
hoodieRecords);
-      return UtilHelpers.handleErrors(jsc, instantTime, writeResponse);
-    } catch (Throwable t) {
-      LOG.error("Error occurred.", t);
-    }
-    return -1;
-  }
-
-  protected JavaRDD<HoodieRecord<HoodieRecordPayload>> 
buildHoodieRecordsForImport(JavaSparkContext jsc,
-      String schemaStr) throws IOException {
-    Job job = Job.getInstance(jsc.hadoopConfiguration());
-    // Allow recursive directories to be found
-    job.getConfiguration().set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
-    // To parallelize reading file status.
-    job.getConfiguration().set(FileInputFormat.LIST_STATUS_NUM_THREADS, 
"1024");
-    AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(), (new 
Schema.Parser().parse(schemaStr)));
-    ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class));
-
-    HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
-    context.setJobStatus(this.getClass().getSimpleName(), "Build records for 
import: " + cfg.tableName);
-    return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, 
Void.class, GenericRecord.class,
-            job.getConfiguration())
-        // To reduce large number of tasks.
-        .coalesce(16 * cfg.parallelism).map(entry -> {
-          GenericRecord genericRecord = ((scala.Tuple2<Void, GenericRecord>) 
entry)._2();
-          Object partitionField = genericRecord.get(cfg.partitionKey);
-          if (partitionField == null) {
-            throw new HoodieIOException("partition key is missing. :" + 
cfg.partitionKey);
-          }
-          Object rowField = genericRecord.get(cfg.rowKey);
-          if (rowField == null) {
-            throw new HoodieIOException("row field is missing. :" + 
cfg.rowKey);
-          }
-          String partitionPath = partitionField.toString();
-          LOG.debug("Row Key : " + rowField + ", Partition Path is (" + 
partitionPath + ")");
-          if (partitionField instanceof Number) {
-            try {
-              long ts = (long) (Double.parseDouble(partitionField.toString()) 
* 1000L);
-              partitionPath = 
PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
-            } catch (NumberFormatException nfe) {
-              LOG.warn("Unable to parse date from partition field. Assuming 
partition as (" + partitionField + ")");
-            }
-          }
-          return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(), 
partitionPath),
-              new HoodieJsonPayload(genericRecord.toString()));
-        });
-  }
-
-  /**
-   * Imports records to Hoodie table.
-   *
-   * @param client Hoodie Client
-   * @param instantTime Instant Time
-   * @param hoodieRecords Hoodie Records
-   * @param <T> Type
-   */
-  protected <T extends HoodieRecordPayload> JavaRDD<WriteStatus> 
load(SparkRDDWriteClient<T> client, String instantTime,
-                                                                      
JavaRDD<HoodieRecord<T>> hoodieRecords) {
-    switch (cfg.command.toLowerCase()) {
-      case "upsert": {
-        return client.upsert(hoodieRecords, instantTime);
-      }
-      case "bulkinsert": {
-        return client.bulkInsert(hoodieRecords, instantTime);
-      }
-      default: {
-        return client.insert(hoodieRecords, instantTime);
-      }
-    }
-  }
-
-  public static class CommandValidator implements IValueValidator<String> {
-
-    List<String> validCommands = Arrays.asList("insert", "upsert", 
"bulkinsert");
-
-    @Override
-    public void validate(String name, String value) {
-      if (value == null || !validCommands.contains(value.toLowerCase())) {
-        throw new ParameterException(
-            String.format("Invalid command: value:%s: supported commands:%s", 
value, validCommands));
-      }
-    }
-  }
-
-  public static class FormatValidator implements IValueValidator<String> {
-
-    List<String> validFormats = Collections.singletonList("parquet");
-
-    @Override
-    public void validate(String name, String value) {
-      if (value == null || !validFormats.contains(value)) {
-        throw new ParameterException(
-            String.format("Invalid format type: value:%s: supported 
formats:%s", value, validFormats));
-      }
-    }
-  }
-
-  public static class Config implements Serializable {
-
-    @Parameter(names = {"--command", "-c"}, description = "Write command Valid 
values are insert(default)/upsert/bulkinsert",
-        validateValueWith = CommandValidator.class)
-    public String command = "INSERT";
-    @Parameter(names = {"--src-path", "-sp"}, description = "Base path for the 
input table", required = true)
-    public String srcPath = null;
-    @Parameter(names = {"--target-path", "-tp"}, description = "Base path for 
the target hoodie table",
-        required = true)
-    public String targetPath = null;
-    @Parameter(names = {"--table-name", "-tn"}, description = "Table name", 
required = true)
-    public String tableName = null;
-    @Parameter(names = {"--table-type", "-tt"}, description = "Table type", 
required = true)
-    public String tableType = null;
-    @Parameter(names = {"--table-version", "-tv"}, description = "Table 
version")
-    public int tableVersion = HoodieTableVersion.current().versionCode();
-    @Parameter(names = {"--row-key-field", "-rk"}, description = "Row key 
field name", required = true)
-    public String rowKey = null;
-    @Parameter(names = {"--partition-key-field", "-pk"}, description = 
"Partition key field name", required = true)
-    public String partitionKey = null;
-    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism 
for hoodie insert(default)/upsert/bulkinsert", required = true)
-    public int parallelism = 1;
-    @Parameter(names = {"--schema-file", "-sf"}, description = "path for Avro 
schema file", required = true)
-    public String schemaFile = null;
-    @Parameter(names = {"--format", "-f"}, description = "Format for the input 
data.", validateValueWith = FormatValidator.class)
-    public String format = null;
-    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
-    public String sparkMaster = null;
-    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory 
to use", required = true)
-    public String sparkMemory = null;
-    @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
-    public int retry = 0;
-    @Parameter(names = {"--props"}, description = "path to properties file on 
localfs or dfs, with configurations for "
-        + "hoodie client for importing")
-    public String propsFilePath = null;
-    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration 
that can be set in the properties file "
-        + "(using the CLI parameter \"--props\") can also be passed command 
line using this parameter. This can be repeated",
-            splitter = IdentitySplitter.class)
-    public List<String> configs = new ArrayList<>();
-    @Parameter(names = {"--help", "-h"}, help = true)
-    public Boolean help = false;
-  }
-}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
deleted file mode 100644
index 115e229cb44..00000000000
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities;
-
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
-import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.HoodieStorageUtils;
-import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Stream;
-
-import scala.Tuple2;
-
-import static 
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
-import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
-import static org.apache.hudi.utilities.UtilHelpers.buildSparkConf;
-
-/**
- * Hoodie snapshot copy job which copies latest files from all partitions to 
another place, for snapshot backup.
- *
- * @deprecated Use {@link HoodieSnapshotExporter} instead.
- */
-public class HoodieSnapshotCopier implements Serializable {
-
-  private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieSnapshotCopier.class);
-
-  static class Config implements Serializable {
-
-    @Parameter(names = {"--base-path", "-bp"}, description = "Hoodie table 
base path", required = true)
-    String basePath = null;
-
-    @Parameter(names = {"--output-path", "-op"}, description = "The snapshot 
output path", required = true)
-    String outputPath = null;
-
-    @Parameter(names = {"--use-file-listing-from-metadata"}, description = 
"Fetch file listing from Hudi's metadata")
-    public Boolean useFileListingFromMetadata = 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
-  }
-
-  public void snapshot(JavaSparkContext jsc, String baseDir, final String 
outputDir,
-                       final boolean useFileListingFromMetadata) throws 
IOException {
-    FileSystem fs = HadoopFSUtils.getFs(baseDir, jsc.hadoopConfiguration());
-    final StorageConfiguration<?> storageConf = 
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration());
-    final HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder()
-        
.setConf(HadoopFSUtils.getStorageConfWithCopy(fs.getConf())).setBasePath(baseDir).build();
-    HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
-    final BaseFileOnlyView fsView = 
HoodieTableFileSystemView.fileListingBasedFileSystemView(context, tableMetadata,
-        
tableMetadata.getActiveTimeline().getWriteTimeline().filterCompletedInstants());
-    // Get the latest commit
-    Option<HoodieInstant> latestCommit =
-        
tableMetadata.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant();
-    if (!latestCommit.isPresent()) {
-      LOG.warn("No commits present. Nothing to snapshot");
-      return;
-    }
-    final String latestCommitTimestamp = latestCommit.get().requestedTime();
-    LOG.info(String.format("Starting to snapshot latest version files which 
are also no-late-than %s.",
-        latestCommitTimestamp));
-
-    List<String> partitions = FSUtils.getAllPartitionPaths(
-        context, new HoodieHadoopStorage(fs), baseDir, 
useFileListingFromMetadata);
-    if (partitions.size() > 0) {
-      LOG.info(String.format("The job needs to copy %d partitions.", 
partitions.size()));
-
-      // Make sure the output directory is empty
-      Path outputPath = new Path(outputDir);
-      if (fs.exists(outputPath)) {
-        LOG.warn(String.format("The output path %s targetBasePath already 
exists, deleting", outputPath));
-        fs.delete(new Path(outputDir), true);
-      }
-
-      context.setJobStatus(this.getClass().getSimpleName(), "Creating a 
snapshot: " + baseDir);
-
-      List<Tuple2<String, String>> filesToCopy = context.flatMap(partitions, 
partition -> {
-        // Only take latest version files <= latestCommit.
-        HoodieStorage storage1 = HoodieStorageUtils.getStorage(baseDir, 
storageConf);
-        List<Tuple2<String, String>> filePaths = new ArrayList<>();
-        Stream<HoodieBaseFile> dataFiles = 
fsView.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp);
-        dataFiles.forEach(hoodieDataFile -> filePaths.add(new 
Tuple2<>(partition, hoodieDataFile.getPath())));
-
-        // also need to copy over partition metadata
-        StoragePath partitionMetaFile = 
HoodiePartitionMetadata.getPartitionMetafilePath(storage1,
-            FSUtils.constructAbsolutePath(baseDir, partition)).get();
-        if (storage1.exists(partitionMetaFile)) {
-          filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString()));
-        }
-
-        return filePaths.stream();
-      }, partitions.size());
-
-      context.foreach(filesToCopy, tuple -> {
-        String partition = tuple._1();
-        Path sourceFilePath = new Path(tuple._2());
-        Path toPartitionPath = 
HadoopFSUtils.constructAbsolutePathInHadoopPath(outputDir, partition);
-        FileSystem ifs = HadoopFSUtils.getFs(baseDir, 
storageConf.unwrapCopyAs(Configuration.class));
-
-        if (!ifs.exists(toPartitionPath)) {
-          ifs.mkdirs(toPartitionPath);
-        }
-        FileUtil.copy(ifs, sourceFilePath, ifs, new Path(toPartitionPath, 
sourceFilePath.getName()), false,
-            ifs.getConf());
-      }, filesToCopy.size());
-      
-      // Also copy the .commit files
-      LOG.info(String.format("Copying .commit files which are no-late-than 
%s.", latestCommitTimestamp));
-      FileStatus[] commitFilesToCopy =
-          Arrays.stream(fs.listStatus(new Path(baseDir + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME)))
-              .filter(fileStatus -> {
-                Path path = fileStatus.getPath();
-                if 
(path.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
-                  return true;
-                } else {
-                  if (fileStatus.isDirectory()) {
-                    return false;
-                  }
-                  String instantTime = 
tableMetadata.getInstantFileNameParser().extractTimestamp(path.getName());
-                  return compareTimestamps(instantTime, LESSER_THAN_OR_EQUALS, 
latestCommitTimestamp);
-                }
-              }).toArray(FileStatus[]::new);
-      for (FileStatus commitStatus : commitFilesToCopy) {
-        Path targetFilePath =
-            new Path(outputDir + "/" + HoodieTableMetaClient.METAFOLDER_NAME + 
"/" + commitStatus.getPath().getName());
-        if (!fs.exists(targetFilePath.getParent())) {
-          fs.mkdirs(targetFilePath.getParent());
-        }
-        if (fs.exists(targetFilePath)) {
-          LOG.error(
-              String.format("The target output commit file (%s targetBasePath) 
already exists.", targetFilePath));
-        }
-        FileUtil.copy(fs, commitStatus.getPath(), fs, targetFilePath, false, 
fs.getConf());
-      }
-    } else {
-      LOG.info("The job has 0 partition to copy.");
-    }
-
-    // Create the _SUCCESS tag
-    Path successTagPath = new Path(outputDir + "/_SUCCESS");
-    if (!fs.exists(successTagPath)) {
-      LOG.info(String.format("Creating _SUCCESS under targetBasePath: %s", 
outputDir));
-      fs.createNewFile(successTagPath);
-    }
-  }
-
-  public static void main(String[] args) throws IOException {
-    // Take input configs
-    final Config cfg = new Config();
-    new JCommander(cfg, null, args);
-    LOG.info(String.format("Snapshot hoodie table from %s (source) to %s 
(target)", cfg.basePath,
-        cfg.outputPath));
-
-    // Create a spark job to do the snapshot copy
-    SparkConf sparkConf = buildSparkConf("Hoodie-snapshot-copier", "local[*]");
-    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-    LOG.info("Initializing spark job.");
-
-    // Copy
-    HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
-    copier.snapshot(jsc, cfg.basePath, cfg.outputPath, 
cfg.useFileListingFromMetadata);
-
-    // Stop the job
-    jsc.stop();
-  }
-}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
deleted file mode 100644
index de5ea9493e1..00000000000
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities.functional;
-
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineUtils;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.StoragePathInfo;
-import org.apache.hudi.testutils.FunctionalTestHarness;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
-import org.apache.hudi.utilities.HDFSParquetImporter;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.avro.AvroParquetWriter;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
-import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-@Disabled("Disable due to flakiness and feature deprecation.")
-@Tag("functional")
-public class TestHDFSParquetImporter extends FunctionalTestHarness implements 
Serializable {
-
-  private String basePath;
-  private transient StoragePath hoodieFolder;
-  private transient Path srcFolder;
-  private transient List<GenericRecord> insertData;
-
-  @BeforeEach
-  public void init() throws IOException, ParseException {
-    basePath = (new Path(dfsBasePath(), 
Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
-
-    // Hoodie root folder.
-    hoodieFolder = new StoragePath(basePath, "testTarget");
-
-    // Create generic records.
-    srcFolder = new Path(basePath, "testSrc");
-    insertData = createInsertRecords(srcFolder);
-  }
-
-  @AfterEach
-  public void clean() throws IOException {
-    hoodieStorage().deleteDirectory(new StoragePath(basePath));
-  }
-
-  /**
-   * Test successful data import with retries.
-   */
-  @Test
-  public void testImportWithRetries() throws Exception {
-    // Create schema file.
-    String schemaFile = new Path(basePath, "file.schema").toString();
-
-    HDFSParquetImporter.Config cfg = 
getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
-        "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
-    AtomicInteger retry = new AtomicInteger(3);
-    AtomicInteger fileCreated = new AtomicInteger(0);
-    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg) {
-      @Override
-      protected int dataImport(JavaSparkContext jsc) throws IOException {
-        int ret = super.dataImport(jsc);
-        if (retry.decrementAndGet() == 0) {
-          fileCreated.incrementAndGet();
-          createSchemaFile(schemaFile);
-        }
-
-        return ret;
-      }
-    };
-    // Schema file is not created so this operation should fail.
-    assertEquals(0, dataImporter.dataImport(jsc(), retry.get()));
-    assertEquals(-1, retry.get());
-    assertEquals(1, fileCreated.get());
-
-    // Check if
-    // 1. .commit file is present
-    // 2. number of records in each partition == 24
-    // 3. total number of partitions == 4;
-    boolean isCommitFilePresent = false;
-    Map<String, Long> recordCounts = new HashMap<String, Long>();
-    List<StoragePathInfo> hoodieFiles = 
hoodieStorage().listFiles(hoodieFolder);
-    for (StoragePathInfo pathInfo : hoodieFiles) {
-      isCommitFilePresent = isCommitFilePresent
-          || 
pathInfo.getPath().toString().endsWith(HoodieTimeline.COMMIT_EXTENSION);
-
-      if (pathInfo.getPath().toString().endsWith("parquet")) {
-        String partitionPath = pathInfo.getPath().getParent().toString();
-        long count = 
sqlContext().read().parquet(pathInfo.getPath().toString()).count();
-        if (!recordCounts.containsKey(partitionPath)) {
-          recordCounts.put(partitionPath, 0L);
-        }
-        recordCounts.put(partitionPath, recordCounts.get(partitionPath) + 
count);
-      }
-    }
-    assertTrue(isCommitFilePresent, "commit file is missing");
-    assertEquals(4, recordCounts.size(), "partition is missing");
-    for (Entry<String, Long> e : recordCounts.entrySet()) {
-      assertEquals(24, e.getValue().longValue(), "missing records");
-    }
-  }
-
-  private void insert(JavaSparkContext jsc) throws IOException {
-    // Create schema file.
-    String schemaFile = new Path(basePath, "file.schema").toString();
-    createSchemaFile(schemaFile);
-
-    HDFSParquetImporter.Config cfg = 
getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
-        "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
-    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
-
-    dataImporter.dataImport(jsc, 0);
-  }
-
-  /**
-   * Test successful insert and verify data consistency.
-   */
-  @Test
-  public void testImportWithInsert() throws IOException, ParseException {
-    insert(jsc());
-    Dataset<Row> ds = HoodieClientTestUtils.read(
-        jsc(), basePath + "/testTarget", sqlContext(), hoodieStorage(),
-        basePath + "/testTarget/*/*/*/*");
-
-    List<Row> readData =
-        ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", 
"begin_lon", "end_lat",
-            "end_lon").collectAsList();
-    List<HoodieTripModel> result = readData.stream().map(row ->
-            new HoodieTripModel(row.getLong(0), row.getString(1), 
row.getString(2), row.getString(3),
-                row.getDouble(4),
-                row.getDouble(5), row.getDouble(6), row.getDouble(7)))
-        .collect(Collectors.toList());
-
-    List<HoodieTripModel> expected = insertData.stream().map(g ->
-            new HoodieTripModel(Long.parseLong(g.get("timestamp").toString()),
-                g.get("_row_key").toString(),
-            g.get("rider").toString(),
-            g.get("driver").toString(),
-            Double.parseDouble(g.get("begin_lat").toString()),
-            Double.parseDouble(g.get("begin_lon").toString()),
-            Double.parseDouble(g.get("end_lat").toString()),
-            Double.parseDouble(g.get("end_lon").toString())))
-        .collect(Collectors.toList());
-
-    assertTrue(result.containsAll(expected) && expected.containsAll(result) && 
result.size() == expected.size());
-  }
-
-  /**
-   * Test upsert data and verify data consistency.
-   */
-  @Test
-  public void testImportWithUpsert() throws IOException, ParseException {
-    insert(jsc());
-
-    // Create schema file.
-    String schemaFile = new Path(basePath, "file.schema").toString();
-
-    Path upsertFolder = new Path(basePath, "testUpsertSrc");
-    List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
-
-    HDFSParquetImporter.Config cfg = 
getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
-        "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
-    cfg.command = "upsert";
-    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
-
-    dataImporter.dataImport(jsc(), 0);
-
-    // construct result, remove top 10 and add upsert data.
-    List<GenericRecord> expectData = insertData.subList(11, 96);
-    expectData.addAll(upsertData);
-
-    // read latest data
-    Dataset<Row> ds =
-        HoodieClientTestUtils.read(jsc(), basePath + "/testTarget", 
sqlContext(), hoodieStorage(),
-            basePath + "/testTarget/*/*/*/*");
-
-    List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", 
"begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
-    List<HoodieTripModel> result = readData.stream().map(row ->
-        new HoodieTripModel(row.getLong(0), row.getString(1), 
row.getString(2), row.getString(3), row.getDouble(4),
-            row.getDouble(5), row.getDouble(6), row.getDouble(7)))
-        .collect(Collectors.toList());
-
-    // get expected result.
-    List<HoodieTripModel> expected = expectData.stream().map(g ->
-        new HoodieTripModel(Long.parseLong(g.get("timestamp").toString()),
-            g.get("_row_key").toString(),
-            g.get("rider").toString(),
-            g.get("driver").toString(),
-            Double.parseDouble(g.get("begin_lat").toString()),
-            Double.parseDouble(g.get("begin_lon").toString()),
-            Double.parseDouble(g.get("end_lat").toString()),
-            Double.parseDouble(g.get("end_lon").toString())))
-        .collect(Collectors.toList());
-
-    assertTrue(result.containsAll(expected) && expected.containsAll(result) && 
result.size() == expected.size());
-  }
-
-  public List<GenericRecord> createInsertRecords(Path srcFolder) throws 
ParseException, IOException {
-    Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
-    long startTime = 
TimelineUtils.parseDateFromInstantTime("20170203000000").getTime() / 1000;
-    List<GenericRecord> records = new ArrayList<GenericRecord>();
-    for (long recordNum = 0; recordNum < 96; recordNum++) {
-      records.add(new 
HoodieTestDataGenerator().generateGenericRecord(Long.toString(recordNum), "0", 
"rider-" + recordNum, "driver-" + recordNum, startTime + 
TimeUnit.HOURS.toSeconds(recordNum)));
-    }
-    try (ParquetWriter<GenericRecord> writer = 
AvroParquetWriter.<GenericRecord>builder(srcFile)
-        .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA)
-        .withConf(HoodieTestUtils.getDefaultStorageConf().unwrap()).build()) {
-      for (GenericRecord record : records) {
-        writer.write(record);
-      }
-    }
-    return records;
-  }
-
-  public List<GenericRecord> createUpsertRecords(Path srcFolder) throws 
ParseException, IOException {
-    Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
-    long startTime = 
TimelineUtils.parseDateFromInstantTime("20170203000000").getTime() / 1000;
-    List<GenericRecord> records = new ArrayList<GenericRecord>();
-    // 10 for update
-    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
-    for (long recordNum = 0; recordNum < 11; recordNum++) {
-      records.add(
-          dataGen.generateGenericRecord(Long.toString(recordNum), "0", 
"rider-upsert-" + recordNum,
-              "driver-upsert" + recordNum, startTime + 
TimeUnit.HOURS.toSeconds(recordNum)));
-    }
-    // 4 for insert
-    for (long recordNum = 96; recordNum < 100; recordNum++) {
-      records.add(
-          dataGen.generateGenericRecord(Long.toString(recordNum), "0", 
"rider-upsert-" + recordNum,
-              "driver-upsert" + recordNum, startTime + 
TimeUnit.HOURS.toSeconds(recordNum)));
-    }
-    try (ParquetWriter<GenericRecord> writer = 
AvroParquetWriter.<GenericRecord>builder(srcFile)
-        .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA)
-        .withConf(HoodieTestUtils.getDefaultStorageConf().unwrap()).build()) {
-      for (GenericRecord record : records) {
-        writer.write(record);
-      }
-    }
-    return records;
-  }
-
-  private void createSchemaFile(String schemaFile) throws IOException {
-    OutputStream schemaFileOS = hoodieStorage().create(new 
StoragePath(schemaFile));
-    
schemaFileOS.write(getUTF8Bytes(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
-    schemaFileOS.close();
-  }
-
-  /**
-   * Tests for scheme file. 1. File is missing. 2. File has invalid data.
-   */
-  @Test
-  public void testSchemaFile() throws Exception {
-    // Hoodie root folder
-    StoragePath hoodieFolder = new StoragePath(basePath, "testTarget");
-    StoragePath srcFolder = new StoragePath(basePath.toString(), "srcTest");
-    StoragePath schemaFile = new StoragePath(basePath.toString(), 
"missingFile.schema");
-    HDFSParquetImporter.Config cfg =
-        getHDFSParquetImporterConfig(srcFolder.toString(), 
hoodieFolder.toString(),
-            "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, 
schemaFile.toString());
-    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
-    // Should fail - return : -1.
-    assertEquals(-1, dataImporter.dataImport(jsc(), 0));
-
-    hoodieStorage().create(schemaFile).write(getUTF8Bytes("Random invalid 
schema data"));
-    // Should fail - return : -1.
-    assertEquals(-1, dataImporter.dataImport(jsc(), 0));
-  }
-
-  /**
-   * Test for missing rowKey and partitionKey.
-   */
-  @Test
-  public void testRowAndPartitionKey() throws Exception {
-    // Create schema file.
-    Path schemaFile = new Path(basePath.toString(), "missingFile.schema");
-    createSchemaFile(schemaFile.toString());
-
-    HDFSParquetImporter dataImporter;
-    HDFSParquetImporter.Config cfg;
-
-    // Check for invalid row key.
-    cfg = getHDFSParquetImporterConfig(srcFolder.toString(), 
hoodieFolder.toString(), "testTable", "COPY_ON_WRITE",
-        "invalidRowKey", "timestamp", 1, schemaFile.toString());
-    dataImporter = new HDFSParquetImporter(cfg);
-    assertEquals(-1, dataImporter.dataImport(jsc(), 0));
-
-    // Check for invalid partition key.
-    cfg = getHDFSParquetImporterConfig(srcFolder.toString(), 
hoodieFolder.toString(), "testTable", "COPY_ON_WRITE",
-        "_row_key", "invalidTimeStamp", 1, schemaFile.toString());
-    dataImporter = new HDFSParquetImporter(cfg);
-    assertEquals(-1, dataImporter.dataImport(jsc(), 0));
-  }
-
-  public HDFSParquetImporter.Config getHDFSParquetImporterConfig(String 
srcPath, String targetPath, String tableName,
-      String tableType, String rowKey, String partitionKey, int parallelism, 
String schemaFile) {
-    HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config();
-    cfg.srcPath = srcPath;
-    cfg.targetPath = targetPath;
-    cfg.tableName = tableName;
-    cfg.tableType = tableType;
-    cfg.rowKey = rowKey;
-    cfg.partitionKey = partitionKey;
-    cfg.parallelism = parallelism;
-    cfg.schemaFile = schemaFile;
-    return cfg;
-  }
-
-  /**
-   * Class used for compare result and expected.
-   */
-  public static class HoodieTripModel {
-
-    long timestamp;
-    String rowKey;
-    String rider;
-    String driver;
-    double beginLat;
-    double beginLon;
-    double endLat;
-    double endLon;
-
-    public HoodieTripModel(long timestamp, String rowKey, String rider, String 
driver, double beginLat,
-        double beginLon, double endLat, double endLon) {
-      this.timestamp = timestamp;
-      this.rowKey = rowKey;
-      this.rider = rider;
-      this.driver = driver;
-      this.beginLat = beginLat;
-      this.beginLon = beginLon;
-      this.endLat = endLat;
-      this.endLon = endLon;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      HoodieTripModel other = (HoodieTripModel) o;
-      return timestamp == other.timestamp && rowKey.equals(other.rowKey) && 
rider.equals(other.rider)
-          && driver.equals(other.driver) && beginLat == other.beginLat && 
beginLon == other.beginLon
-          && endLat == other.endLat && endLon == other.endLon;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(timestamp, rowKey, rider, driver, beginLat, 
beginLon, endLat, endLon);
-    }
-  }
-}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java
deleted file mode 100644
index 19f58fadbe3..00000000000
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities.functional;
-
-import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.testutils.FunctionalTestHarness;
-import org.apache.hudi.utilities.HoodieSnapshotCopier;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-@Disabled("Disable due to flakiness and feature deprecation.")
-@Tag("functional")
-public class TestHoodieSnapshotCopier extends FunctionalTestHarness {
-
-  private static final String BASE_FILE_EXTENSION = 
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
-  private static final String TEST_WRITE_TOKEN = "1-0-1";
-
-  private String basePath;
-  private String outputPath;
-  private FileSystem fs;
-
-  @BeforeEach
-  public void init() throws IOException {
-    // Prepare directories
-    String rootPath = "file://" + tempDir.toString();
-    basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME;
-    outputPath = rootPath + "/output";
-
-    final StorageConfiguration<?> storageConf = 
HoodieTestUtils.getDefaultStorageConf();
-    fs = HadoopFSUtils.getFs(basePath, storageConf);
-    HoodieTestUtils.init(storageConf, basePath);
-  }
-
-  @Test
-  public void testEmptySnapshotCopy() throws IOException {
-    // There is no real data (only .hoodie directory)
-    assertEquals(fs.listStatus(new Path(basePath)).length, 1);
-    assertFalse(fs.exists(new Path(outputPath)));
-
-    // Do the snapshot
-    HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
-    copier.snapshot(jsc(), basePath, outputPath,
-        HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS);
-
-    // Nothing changed; we just bail out
-    assertEquals(fs.listStatus(new Path(basePath)).length, 1);
-    assertFalse(fs.exists(new Path(outputPath + "/_SUCCESS")));
-  }
-
-  // TODO - uncomment this after fixing test failures
-  // @Test
-  public void testSnapshotCopy() throws Exception {
-    // Generate some commits and corresponding parquets
-    String commitTime1 = "20160501010101";
-    String commitTime2 = "20160502020601";
-    String commitTime3 = "20160506030611";
-    new File(basePath + "/.hoodie").mkdirs();
-    new File(basePath + "/.hoodie/hoodie.properties").createNewFile();
-    // Only first two have commit files
-    new File(basePath + "/.hoodie/timeline/" + commitTime1 + 
".commit").createNewFile();
-    new File(basePath + "/.hoodie/timeline/" + commitTime2 + 
".commit").createNewFile();
-    new File(basePath + "/.hoodie/timeline/" + commitTime3 + 
".inflight").createNewFile();
-
-    // Some parquet files
-    new File(basePath + "/2016/05/01/").mkdirs();
-    new File(basePath + "/2016/05/02/").mkdirs();
-    new File(basePath + "/2016/05/06/").mkdirs();
-    HoodieTestDataGenerator.writePartitionMetadataDeprecated(hoodieStorage(), 
new String[] {"2016/05/01", "2016/05/02", "2016/05/06"},
-        basePath);
-    // Make commit1
-    File file11 = new File(basePath + "/2016/05/01/" + 
FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, "id11", 
BASE_FILE_EXTENSION));
-    file11.createNewFile();
-    File file12 = new File(basePath + "/2016/05/02/" + 
FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, "id12", 
BASE_FILE_EXTENSION));
-    file12.createNewFile();
-    File file13 = new File(basePath + "/2016/05/06/" + 
FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, "id13", 
BASE_FILE_EXTENSION));
-    file13.createNewFile();
-
-    // Make commit2
-    File file21 = new File(basePath + "/2016/05/01/" + 
FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN, "id21", 
BASE_FILE_EXTENSION));
-    file21.createNewFile();
-    File file22 = new File(basePath + "/2016/05/02/" + 
FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN, "id22", 
BASE_FILE_EXTENSION));
-    file22.createNewFile();
-    File file23 = new File(basePath + "/2016/05/06/" + 
FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN, "id23", 
BASE_FILE_EXTENSION));
-    file23.createNewFile();
-
-    // Make commit3
-    File file31 = new File(basePath + "/2016/05/01/" + 
FSUtils.makeBaseFileName(commitTime3, TEST_WRITE_TOKEN, "id31", 
BASE_FILE_EXTENSION));
-    file31.createNewFile();
-    File file32 = new File(basePath + "/2016/05/02/" + 
FSUtils.makeBaseFileName(commitTime3, TEST_WRITE_TOKEN, "id32", 
BASE_FILE_EXTENSION));
-    file32.createNewFile();
-    File file33 = new File(basePath + "/2016/05/06/" + 
FSUtils.makeBaseFileName(commitTime3, TEST_WRITE_TOKEN, "id33", 
BASE_FILE_EXTENSION));
-    file33.createNewFile();
-
-    // Do a snapshot copy
-    HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
-    copier.snapshot(jsc(), basePath, outputPath, 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS);
-
-    // Check results
-    assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + 
file11.getName())));
-    assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + 
file12.getName())));
-    assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + 
file13.getName())));
-    assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + 
file21.getName())));
-    assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + 
file22.getName())));
-    assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + 
file23.getName())));
-    assertFalse(fs.exists(new Path(outputPath + "/2016/05/01/" + 
file31.getName())));
-    assertFalse(fs.exists(new Path(outputPath + "/2016/05/02/" + 
file32.getName())));
-    assertFalse(fs.exists(new Path(outputPath + "/2016/05/06/" + 
file33.getName())));
-
-    assertTrue(fs.exists(new Path(outputPath + "/.hoodie/timeline/" + 
commitTime1 + ".commit")));
-    assertTrue(fs.exists(new Path(outputPath + "/.hoodie/timeline/" + 
commitTime2 + ".commit")));
-    assertFalse(fs.exists(new Path(outputPath + "/.hoodie/timeline/" + 
commitTime3 + ".commit")));
-    assertFalse(fs.exists(new Path(outputPath + "/.hoodie/timeline/" + 
commitTime3 + ".inflight")));
-    assertTrue(fs.exists(new Path(outputPath + "/.hoodie/hoodie.properties")));
-
-    assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS")));
-  }
-}

Reply via email to