This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new aeebfcfcec2 [HUDI-8697] Revisit TestHDFSParquetImporter and
TestHoodieSnapshotCopier (#12695)
aeebfcfcec2 is described below
commit aeebfcfcec271e8ff8f37e7f7ef2418d386f4c76
Author: Vova Kolmakov <[email protected]>
AuthorDate: Fri Mar 7 08:26:21 2025 +0700
[HUDI-8697] Revisit TestHDFSParquetImporter and TestHoodieSnapshotCopier
(#12695)
Co-authored-by: Vova Kolmakov <[email protected]>
Co-authored-by: Vova Kolmakov <[email protected]>
---
.../cli/commands/HDFSParquetImportCommand.java | 93 -----
.../org/apache/hudi/cli/commands/SparkMain.java | 25 +-
.../cli/integ/ITTestHDFSParquetImportCommand.java | 200 -----------
.../apache/hudi/utilities/HDFSParquetImporter.java | 301 ----------------
.../hudi/utilities/HoodieSnapshotCopier.java | 210 -----------
.../functional/TestHDFSParquetImporter.java | 394 ---------------------
.../functional/TestHoodieSnapshotCopier.java | 151 --------
7 files changed, 2 insertions(+), 1372 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
deleted file mode 100644
index dafd15b174a..00000000000
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.cli.commands;
-
-import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
-import org.apache.hudi.cli.utils.InputStreamConsumer;
-import org.apache.hudi.cli.utils.SparkUtil;
-import org.apache.hudi.utilities.HDFSParquetImporter.FormatValidator;
-import org.apache.hudi.utilities.UtilHelpers;
-import org.apache.hudi.utilities.streamer.HoodieStreamer;
-
-import org.apache.spark.launcher.SparkLauncher;
-import org.apache.spark.util.Utils;
-import org.springframework.shell.standard.ShellComponent;
-import org.springframework.shell.standard.ShellMethod;
-import org.springframework.shell.standard.ShellOption;
-
-import scala.collection.JavaConverters;
-
-/**
- * CLI command for importing parquet table to hudi table.
- *
- * @see HoodieStreamer
- * @deprecated This utility is deprecated in 0.10.0 and will be removed in
0.11.0. Use {@link HoodieStreamer.Config#runBootstrap} instead.
- */
-@ShellComponent
-public class HDFSParquetImportCommand {
-
- @ShellMethod(key = "hdfsparquetimport", value = "Imports Parquet table to a
hoodie table")
- public String convert(
- @ShellOption(value = "--upsert", defaultValue = "false",
- help = "Uses upsert API instead of the default insert API of
WriteClient") boolean useUpsert,
- @ShellOption(value = "--srcPath", help = "Base path for the input
table") final String srcPath,
- @ShellOption(value = "--targetPath",
- help = "Base path for the target hoodie table") final String
targetPath,
- @ShellOption(value = "--tableName", help = "Table name") final String
tableName,
- @ShellOption(value = "--tableType", help = "Table type") final String
tableType,
- @ShellOption(value = "--rowKeyField", help = "Row key field name") final
String rowKeyField,
- @ShellOption(value = "--partitionPathField", defaultValue = "",
- help = "Partition path field name") final String partitionPathField,
- @ShellOption(value = {"--parallelism"},
- help = "Parallelism for hoodie insert") final String parallelism,
- @ShellOption(value = "--schemaFilePath",
- help = "Path for Avro schema file") final String schemaFilePath,
- @ShellOption(value = "--format", help = "Format for the input data")
final String format,
- @ShellOption(value = "--sparkMaster", defaultValue = "", help = "Spark
Master") String master,
- @ShellOption(value = "--sparkMemory", help = "Spark executor memory")
final String sparkMemory,
- @ShellOption(value = "--retry", help = "Number of retries") final String
retry,
- @ShellOption(value = "--propsFilePath", help = "path to properties file
on localfs or dfs with configurations for hoodie client for importing",
- defaultValue = "") final String propsFilePath,
- @ShellOption(value = "--hoodieConfigs", help = "Any configuration that
can be set in the properties file can be passed here in the form of an array",
- defaultValue = "") final String[] configs) throws Exception {
-
- (new FormatValidator()).validate("format", format);
-
- String sparkPropertiesPath =
-
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
-
- SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
-
- SparkCommand cmd = SparkCommand.IMPORT;
- if (useUpsert) {
- cmd = SparkCommand.UPSERT;
- }
-
- SparkMain.addAppArgs(sparkLauncher, cmd, master, sparkMemory, srcPath,
targetPath, tableName, tableType, rowKeyField,
- partitionPathField, parallelism, schemaFilePath, retry, propsFilePath);
- UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
- Process process = sparkLauncher.launch();
- InputStreamConsumer.captureOutput(process);
- int exitCode = process.waitFor();
- if (exitCode != 0) {
- return "Failed to import table to hoodie format";
- }
- return "Table imported to hoodie format";
- }
-}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index d752f9f27b9..5448f155213 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -40,6 +40,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
@@ -50,8 +51,6 @@ import
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
-import org.apache.hudi.utilities.HDFSParquetImporter;
-import org.apache.hudi.utilities.HDFSParquetImporter.Config;
import org.apache.hudi.utilities.HoodieCleaner;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieCompactionAdminTool;
@@ -171,9 +170,7 @@ public class SparkMain {
break;
case IMPORT:
case UPSERT:
- returnCode = dataLoad(jsc, commandString, args[3], args[4], args[5],
args[6], args[7], args[8],
- Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]),
propsFilePath, configs);
- break;
+ throw new HoodieNotSupportedException("This command is no longer
supported. Use HoodieStreamer utility instead.");
case COMPACT_RUN:
returnCode = compact(jsc, args[3], args[4], args[5],
Integer.parseInt(args[6]), args[7],
Integer.parseInt(args[8]), HoodieCompactor.EXECUTE,
propsFilePath, configs);
@@ -290,24 +287,6 @@ public class SparkMain {
}
}
- private static int dataLoad(JavaSparkContext jsc, String command, String
srcPath, String targetPath, String tableName,
- String tableType, String rowKey, String
partitionKey, int parallelism, String schemaFile,
- int retry, String propsFilePath, List<String>
configs) {
- Config cfg = new Config();
- cfg.command = command;
- cfg.srcPath = srcPath;
- cfg.targetPath = targetPath;
- cfg.tableName = tableName;
- cfg.tableType = tableType;
- cfg.rowKey = rowKey;
- cfg.partitionKey = partitionKey;
- cfg.parallelism = parallelism;
- cfg.schemaFile = schemaFile;
- cfg.propsFilePath = propsFilePath;
- cfg.configs = configs;
- return new HDFSParquetImporter(cfg).dataImport(jsc, retry);
- }
-
private static void doCompactValidate(JavaSparkContext jsc, String basePath,
String compactionInstant,
String outputPath, int parallelism)
throws Exception {
HoodieCompactionAdminTool.Config cfg = new
HoodieCompactionAdminTool.Config();
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java
deleted file mode 100644
index 40d8cc04f58..00000000000
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.cli.integ;
-
-import org.apache.hudi.cli.HoodieCLI;
-import org.apache.hudi.cli.commands.TableCommand;
-import org.apache.hudi.cli.testutils.HoodieCLIIntegrationTestBase;
-import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
-import org.apache.hudi.utilities.HDFSParquetImporter;
-import org.apache.hudi.utilities.functional.TestHDFSParquetImporter;
-import
org.apache.hudi.utilities.functional.TestHDFSParquetImporter.HoodieTripModel;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.shell.Shell;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.text.ParseException;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
-import static org.junit.jupiter.api.Assertions.assertAll;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Test class for {@link
org.apache.hudi.cli.commands.HDFSParquetImportCommand}.
- */
-@Disabled("Disable due to flakiness and feature deprecation.")
-@SpringBootTest(properties = {"spring.shell.interactive.enabled=false",
"spring.shell.command.script.enabled=false"})
-public class ITTestHDFSParquetImportCommand extends
HoodieCLIIntegrationTestBase {
-
- @Autowired
- private Shell shell;
- private Path sourcePath;
- private Path targetPath;
- private String tableName;
- private String schemaFile;
- private String tablePath;
-
- private List<GenericRecord> insertData;
- private TestHDFSParquetImporter importer;
-
- @BeforeEach
- public void init() throws IOException, ParseException {
- tableName = "test_table";
- tablePath = basePath + StoragePath.SEPARATOR + tableName;
- sourcePath = new Path(basePath, "source");
- targetPath = new Path(tablePath);
- schemaFile = new StoragePath(basePath, "file.schema").toString();
-
- // create schema file
- try (OutputStream schemaFileOS = storage.create(new
StoragePath(schemaFile))) {
-
schemaFileOS.write(getUTF8Bytes(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
- }
-
- importer = new TestHDFSParquetImporter();
- insertData = importer.createInsertRecords(sourcePath);
- }
-
- /**
- * Test case for 'hdfsparquetimport' with insert.
- */
- @Test
- public void testConvertWithInsert() throws IOException {
- String command = String.format("hdfsparquetimport --srcPath %s
--targetPath %s --tableName %s "
- + "--tableType %s --rowKeyField %s" + " --partitionPathField %s
--parallelism %s "
- + "--schemaFilePath %s --format %s --sparkMemory %s --retry %s
--sparkMaster %s",
- sourcePath.toString(), targetPath.toString(), tableName,
HoodieTableType.COPY_ON_WRITE.name(),
- "_row_key", "timestamp", "1", schemaFile, "parquet", "2G", "1",
"local");
-
- Object result = shell.evaluate(() -> command);
-
- assertAll("Command run success",
- () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
- () -> assertEquals("Table imported to hoodie format",
result.toString()));
-
- // Check hudi table exist
- String metaPath = targetPath + StoragePath.SEPARATOR +
HoodieTableMetaClient.METAFOLDER_NAME;
- assertTrue(Files.exists(Paths.get(metaPath)), "Hoodie table not exist.");
-
- // Load meta data
- new TableCommand().connect(targetPath.toString(), false, 2000, 300000, 7,
- "WAIT_TO_ADJUST_SKEW", 200L, false);
- metaClient = HoodieCLI.getTableMetaClient();
-
- assertEquals(1,
metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should
only 1 commit.");
-
- verifyResultData(insertData);
- }
-
- /**
- * Test case for 'hdfsparquetimport' with upsert.
- */
- @Test
- public void testConvertWithUpsert() throws IOException, ParseException {
- Path upsertFolder = new Path(basePath, "testUpsertSrc");
- List<GenericRecord> upsertData =
importer.createUpsertRecords(upsertFolder);
-
- // first insert records
- HDFSParquetImporter.Config cfg =
importer.getHDFSParquetImporterConfig(sourcePath.toString(), tablePath,
- tableName, HoodieTableType.COPY_ON_WRITE.name(), "_row_key",
"timestamp", 1, schemaFile);
- HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
-
- dataImporter.dataImport(jsc, 0);
-
- // Load meta data
- new TableCommand().connect(targetPath.toString(), false, 2000, 300000, 7,
- "WAIT_TO_ADJUST_SKEW", 200L, false);
- metaClient = HoodieCLI.getTableMetaClient();
-
- // check if insert instant exist
- assertEquals(1,
metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should
only 1 commit.");
-
- String command = String.format("hdfsparquetimport --srcPath %s
--targetPath %s --tableName %s "
- + "--tableType %s --rowKeyField %s" + " --partitionPathField %s
--parallelism %s "
- + "--schemaFilePath %s --format %s --sparkMemory %s --retry %s
--sparkMaster %s --upsert %s",
- upsertFolder.toString(), targetPath.toString(), tableName,
HoodieTableType.COPY_ON_WRITE.name(),
- "_row_key", "timestamp", "1", schemaFile, "parquet", "2G", "1",
"local", "true");
- Object result = shell.evaluate(() -> command);
-
- assertAll("Command run success",
- () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
- () -> assertEquals("Table imported to hoodie format",
result.toString()));
-
- // reload meta client
- metaClient = HoodieTableMetaClient.reload(metaClient);
- assertEquals(2,
metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should
have 2 commit.");
-
- // construct result, remove top 10 and add upsert data.
- List<GenericRecord> expectData = insertData.subList(11, 96);
- expectData.addAll(upsertData);
-
- verifyResultData(expectData);
- }
-
- /**
- * Method to verify result is equals to expect.
- */
- private void verifyResultData(List<GenericRecord> expectData) {
- Dataset<Row> ds = HoodieClientTestUtils.read(jsc, tablePath, sqlContext,
- storage, tablePath + "/*/*/*/*");
-
- List<Row> readData =
- ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat",
"begin_lon", "end_lat",
- "end_lon").collectAsList();
- List<HoodieTripModel> result = readData.stream().map(row ->
- new HoodieTripModel(row.getLong(0), row.getString(1),
row.getString(2), row.getString(3),
- row.getDouble(4),
- row.getDouble(5), row.getDouble(6), row.getDouble(7)))
- .collect(Collectors.toList());
-
- List<HoodieTripModel> expected = expectData.stream().map(g ->
- new HoodieTripModel(Long.parseLong(g.get("timestamp").toString()),
- g.get("_row_key").toString(),
- g.get("rider").toString(),
- g.get("driver").toString(),
- Double.parseDouble(g.get("begin_lat").toString()),
- Double.parseDouble(g.get("begin_lon").toString()),
- Double.parseDouble(g.get("end_lat").toString()),
- Double.parseDouble(g.get("end_lon").toString())))
- .collect(Collectors.toList());
-
- assertAll("Result list equals",
- () -> assertEquals(expected.size(), result.size()),
- () -> assertTrue(result.containsAll(expected) &&
expected.containsAll(result)));
- }
-}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
deleted file mode 100644
index 85af942007f..00000000000
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities;
-
-import org.apache.hudi.SparkAdapterSupport$;
-import org.apache.hudi.client.SparkRDDWriteClient;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.HoodieJsonPayload;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.HoodieTableVersion;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.utilities.streamer.HoodieStreamer;
-
-import com.beust.jcommander.IValueValidator;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.parquet.avro.AvroReadSupport;
-import org.apache.parquet.hadoop.ParquetInputFormat;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.time.Instant;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Loads data from Parquet Sources.
- *
- * @see HoodieStreamer
- * @deprecated This utility is deprecated in 0.10.0 and will be removed in
0.11.0. Use {@link HoodieStreamer.Config#runBootstrap} instead.
- */
-public class HDFSParquetImporter implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOG =
LoggerFactory.getLogger(HDFSParquetImporter.class);
-
- private static final DateTimeFormatter PARTITION_FORMATTER =
DateTimeFormatter.ofPattern("yyyy/MM/dd")
- .withZone(ZoneId.systemDefault());
- private final Config cfg;
- private transient FileSystem fs;
- /**
- * Bag of properties with source, hoodie client, key generator etc.
- */
- private TypedProperties props;
-
- public HDFSParquetImporter(Config cfg) {
- this.cfg = cfg;
- }
-
- public static void main(String[] args) {
- final Config cfg = new Config();
- JCommander cmd = new JCommander(cfg, null, args);
- if (cfg.help || args.length == 0) {
- cmd.usage();
- System.exit(1);
- }
- HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
- JavaSparkContext jssc =
- UtilHelpers.buildSparkContext("data-importer-" + cfg.tableName,
cfg.sparkMaster, cfg.sparkMemory);
- int exitCode = 0;
- try {
- dataImporter.dataImport(jssc, cfg.retry);
- } catch (Throwable throwable) {
- exitCode = 1;
- throw new HoodieException("Failed to run HoodieStreamer ", throwable);
- } finally {
- SparkAdapterSupport$.MODULE$.sparkAdapter().stopSparkContext(jssc,
exitCode);
- }
-
- }
-
- private boolean isUpsert() {
- return "upsert".equalsIgnoreCase(cfg.command);
- }
-
- public int dataImport(JavaSparkContext jsc, int retry) {
- this.fs = HadoopFSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
- this.props = cfg.propsFilePath == null ?
UtilHelpers.buildProperties(cfg.configs)
- : UtilHelpers.readConfig(fs.getConf(), new Path(cfg.propsFilePath),
cfg.configs).getProps(true);
- LOG.info("Starting data import with configs : " + props.toString());
- int ret = -1;
- try {
- // Verify that targetPath is not present.
- if (fs.exists(new Path(cfg.targetPath)) && !isUpsert()) {
- throw new HoodieIOException(String.format("Make sure %s is not
present.", cfg.targetPath));
- }
- do {
- ret = dataImport(jsc);
- } while (ret != 0 && retry-- > 0);
- } catch (Throwable t) {
- LOG.error("Import data error", t);
- }
- return ret;
- }
-
- protected int dataImport(JavaSparkContext jsc) throws IOException {
- try {
- if (fs.exists(new Path(cfg.targetPath)) && !isUpsert()) {
- // cleanup target directory.
- fs.delete(new Path(cfg.targetPath), true);
- }
-
- if (!fs.exists(new Path(cfg.targetPath))) {
- // Initialize target hoodie table.
- HoodieTableMetaClient.newTableBuilder()
- .setTableName(cfg.tableName)
- .setTableType(cfg.tableType)
- .setTableVersion(cfg.tableVersion)
-
.initTable(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()),
cfg.targetPath);
- }
-
- // Get schema.
- String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
-
- SparkRDDWriteClient<HoodieRecordPayload> client =
- UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr,
cfg.parallelism, Option.empty(), props);
-
- JavaRDD<HoodieRecord<HoodieRecordPayload>> hoodieRecords =
buildHoodieRecordsForImport(jsc, schemaStr);
- // Get instant time.
- String instantTime = client.startCommit();
- JavaRDD<WriteStatus> writeResponse = load(client, instantTime,
hoodieRecords);
- return UtilHelpers.handleErrors(jsc, instantTime, writeResponse);
- } catch (Throwable t) {
- LOG.error("Error occurred.", t);
- }
- return -1;
- }
-
- protected JavaRDD<HoodieRecord<HoodieRecordPayload>>
buildHoodieRecordsForImport(JavaSparkContext jsc,
- String schemaStr) throws IOException {
- Job job = Job.getInstance(jsc.hadoopConfiguration());
- // Allow recursive directories to be found
- job.getConfiguration().set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
- // To parallelize reading file status.
- job.getConfiguration().set(FileInputFormat.LIST_STATUS_NUM_THREADS,
"1024");
- AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(), (new
Schema.Parser().parse(schemaStr)));
- ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class));
-
- HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
- context.setJobStatus(this.getClass().getSimpleName(), "Build records for
import: " + cfg.tableName);
- return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class,
Void.class, GenericRecord.class,
- job.getConfiguration())
- // To reduce large number of tasks.
- .coalesce(16 * cfg.parallelism).map(entry -> {
- GenericRecord genericRecord = ((scala.Tuple2<Void, GenericRecord>)
entry)._2();
- Object partitionField = genericRecord.get(cfg.partitionKey);
- if (partitionField == null) {
- throw new HoodieIOException("partition key is missing. :" +
cfg.partitionKey);
- }
- Object rowField = genericRecord.get(cfg.rowKey);
- if (rowField == null) {
- throw new HoodieIOException("row field is missing. :" +
cfg.rowKey);
- }
- String partitionPath = partitionField.toString();
- LOG.debug("Row Key : " + rowField + ", Partition Path is (" +
partitionPath + ")");
- if (partitionField instanceof Number) {
- try {
- long ts = (long) (Double.parseDouble(partitionField.toString())
* 1000L);
- partitionPath =
PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
- } catch (NumberFormatException nfe) {
- LOG.warn("Unable to parse date from partition field. Assuming
partition as (" + partitionField + ")");
- }
- }
- return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(),
partitionPath),
- new HoodieJsonPayload(genericRecord.toString()));
- });
- }
-
- /**
- * Imports records to Hoodie table.
- *
- * @param client Hoodie Client
- * @param instantTime Instant Time
- * @param hoodieRecords Hoodie Records
- * @param <T> Type
- */
- protected <T extends HoodieRecordPayload> JavaRDD<WriteStatus>
load(SparkRDDWriteClient<T> client, String instantTime,
-
JavaRDD<HoodieRecord<T>> hoodieRecords) {
- switch (cfg.command.toLowerCase()) {
- case "upsert": {
- return client.upsert(hoodieRecords, instantTime);
- }
- case "bulkinsert": {
- return client.bulkInsert(hoodieRecords, instantTime);
- }
- default: {
- return client.insert(hoodieRecords, instantTime);
- }
- }
- }
-
- public static class CommandValidator implements IValueValidator<String> {
-
- List<String> validCommands = Arrays.asList("insert", "upsert",
"bulkinsert");
-
- @Override
- public void validate(String name, String value) {
- if (value == null || !validCommands.contains(value.toLowerCase())) {
- throw new ParameterException(
- String.format("Invalid command: value:%s: supported commands:%s",
value, validCommands));
- }
- }
- }
-
- public static class FormatValidator implements IValueValidator<String> {
-
- List<String> validFormats = Collections.singletonList("parquet");
-
- @Override
- public void validate(String name, String value) {
- if (value == null || !validFormats.contains(value)) {
- throw new ParameterException(
- String.format("Invalid format type: value:%s: supported
formats:%s", value, validFormats));
- }
- }
- }
-
- public static class Config implements Serializable {
-
- @Parameter(names = {"--command", "-c"}, description = "Write command Valid
values are insert(default)/upsert/bulkinsert",
- validateValueWith = CommandValidator.class)
- public String command = "INSERT";
- @Parameter(names = {"--src-path", "-sp"}, description = "Base path for the
input table", required = true)
- public String srcPath = null;
- @Parameter(names = {"--target-path", "-tp"}, description = "Base path for
the target hoodie table",
- required = true)
- public String targetPath = null;
- @Parameter(names = {"--table-name", "-tn"}, description = "Table name",
required = true)
- public String tableName = null;
- @Parameter(names = {"--table-type", "-tt"}, description = "Table type",
required = true)
- public String tableType = null;
- @Parameter(names = {"--table-version", "-tv"}, description = "Table
version")
- public int tableVersion = HoodieTableVersion.current().versionCode();
- @Parameter(names = {"--row-key-field", "-rk"}, description = "Row key
field name", required = true)
- public String rowKey = null;
- @Parameter(names = {"--partition-key-field", "-pk"}, description =
"Partition key field name", required = true)
- public String partitionKey = null;
- @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism
for hoodie insert(default)/upsert/bulkinsert", required = true)
- public int parallelism = 1;
- @Parameter(names = {"--schema-file", "-sf"}, description = "path for Avro
schema file", required = true)
- public String schemaFile = null;
- @Parameter(names = {"--format", "-f"}, description = "Format for the input
data.", validateValueWith = FormatValidator.class)
- public String format = null;
- @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
- public String sparkMaster = null;
- @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory
to use", required = true)
- public String sparkMemory = null;
- @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
- public int retry = 0;
- @Parameter(names = {"--props"}, description = "path to properties file on
localfs or dfs, with configurations for "
- + "hoodie client for importing")
- public String propsFilePath = null;
- @Parameter(names = {"--hoodie-conf"}, description = "Any configuration
that can be set in the properties file "
- + "(using the CLI parameter \"--props\") can also be passed command
line using this parameter. This can be repeated",
- splitter = IdentitySplitter.class)
- public List<String> configs = new ArrayList<>();
- @Parameter(names = {"--help", "-h"}, help = true)
- public Boolean help = false;
- }
-}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
deleted file mode 100644
index 115e229cb44..00000000000
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities;
-
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
-import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.HoodieStorageUtils;
-import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Stream;
-
-import scala.Tuple2;
-
-import static
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
-import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
-import static org.apache.hudi.utilities.UtilHelpers.buildSparkConf;
-
-/**
- * Hoodie snapshot copy job which copies latest files from all partitions to
another place, for snapshot backup.
- *
- * @deprecated Use {@link HoodieSnapshotExporter} instead.
- */
-public class HoodieSnapshotCopier implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieSnapshotCopier.class);
-
- static class Config implements Serializable {
-
- @Parameter(names = {"--base-path", "-bp"}, description = "Hoodie table
base path", required = true)
- String basePath = null;
-
- @Parameter(names = {"--output-path", "-op"}, description = "The snapshot
output path", required = true)
- String outputPath = null;
-
- @Parameter(names = {"--use-file-listing-from-metadata"}, description =
"Fetch file listing from Hudi's metadata")
- public Boolean useFileListingFromMetadata =
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
- }
-
- public void snapshot(JavaSparkContext jsc, String baseDir, final String
outputDir,
- final boolean useFileListingFromMetadata) throws
IOException {
- FileSystem fs = HadoopFSUtils.getFs(baseDir, jsc.hadoopConfiguration());
- final StorageConfiguration<?> storageConf =
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration());
- final HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder()
-
.setConf(HadoopFSUtils.getStorageConfWithCopy(fs.getConf())).setBasePath(baseDir).build();
- HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
- final BaseFileOnlyView fsView =
HoodieTableFileSystemView.fileListingBasedFileSystemView(context, tableMetadata,
-
tableMetadata.getActiveTimeline().getWriteTimeline().filterCompletedInstants());
- // Get the latest commit
- Option<HoodieInstant> latestCommit =
-
tableMetadata.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant();
- if (!latestCommit.isPresent()) {
- LOG.warn("No commits present. Nothing to snapshot");
- return;
- }
- final String latestCommitTimestamp = latestCommit.get().requestedTime();
- LOG.info(String.format("Starting to snapshot latest version files which
are also no-late-than %s.",
- latestCommitTimestamp));
-
- List<String> partitions = FSUtils.getAllPartitionPaths(
- context, new HoodieHadoopStorage(fs), baseDir,
useFileListingFromMetadata);
- if (partitions.size() > 0) {
- LOG.info(String.format("The job needs to copy %d partitions.",
partitions.size()));
-
- // Make sure the output directory is empty
- Path outputPath = new Path(outputDir);
- if (fs.exists(outputPath)) {
- LOG.warn(String.format("The output path %s targetBasePath already
exists, deleting", outputPath));
- fs.delete(new Path(outputDir), true);
- }
-
- context.setJobStatus(this.getClass().getSimpleName(), "Creating a
snapshot: " + baseDir);
-
- List<Tuple2<String, String>> filesToCopy = context.flatMap(partitions,
partition -> {
- // Only take latest version files <= latestCommit.
- HoodieStorage storage1 = HoodieStorageUtils.getStorage(baseDir,
storageConf);
- List<Tuple2<String, String>> filePaths = new ArrayList<>();
- Stream<HoodieBaseFile> dataFiles =
fsView.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp);
- dataFiles.forEach(hoodieDataFile -> filePaths.add(new
Tuple2<>(partition, hoodieDataFile.getPath())));
-
- // also need to copy over partition metadata
- StoragePath partitionMetaFile =
HoodiePartitionMetadata.getPartitionMetafilePath(storage1,
- FSUtils.constructAbsolutePath(baseDir, partition)).get();
- if (storage1.exists(partitionMetaFile)) {
- filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString()));
- }
-
- return filePaths.stream();
- }, partitions.size());
-
- context.foreach(filesToCopy, tuple -> {
- String partition = tuple._1();
- Path sourceFilePath = new Path(tuple._2());
- Path toPartitionPath =
HadoopFSUtils.constructAbsolutePathInHadoopPath(outputDir, partition);
- FileSystem ifs = HadoopFSUtils.getFs(baseDir,
storageConf.unwrapCopyAs(Configuration.class));
-
- if (!ifs.exists(toPartitionPath)) {
- ifs.mkdirs(toPartitionPath);
- }
- FileUtil.copy(ifs, sourceFilePath, ifs, new Path(toPartitionPath,
sourceFilePath.getName()), false,
- ifs.getConf());
- }, filesToCopy.size());
-
- // Also copy the .commit files
- LOG.info(String.format("Copying .commit files which are no-late-than
%s.", latestCommitTimestamp));
- FileStatus[] commitFilesToCopy =
- Arrays.stream(fs.listStatus(new Path(baseDir + "/" +
HoodieTableMetaClient.METAFOLDER_NAME)))
- .filter(fileStatus -> {
- Path path = fileStatus.getPath();
- if
(path.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
- return true;
- } else {
- if (fileStatus.isDirectory()) {
- return false;
- }
- String instantTime =
tableMetadata.getInstantFileNameParser().extractTimestamp(path.getName());
- return compareTimestamps(instantTime, LESSER_THAN_OR_EQUALS,
latestCommitTimestamp);
- }
- }).toArray(FileStatus[]::new);
- for (FileStatus commitStatus : commitFilesToCopy) {
- Path targetFilePath =
- new Path(outputDir + "/" + HoodieTableMetaClient.METAFOLDER_NAME +
"/" + commitStatus.getPath().getName());
- if (!fs.exists(targetFilePath.getParent())) {
- fs.mkdirs(targetFilePath.getParent());
- }
- if (fs.exists(targetFilePath)) {
- LOG.error(
- String.format("The target output commit file (%s targetBasePath)
already exists.", targetFilePath));
- }
- FileUtil.copy(fs, commitStatus.getPath(), fs, targetFilePath, false,
fs.getConf());
- }
- } else {
- LOG.info("The job has 0 partition to copy.");
- }
-
- // Create the _SUCCESS tag
- Path successTagPath = new Path(outputDir + "/_SUCCESS");
- if (!fs.exists(successTagPath)) {
- LOG.info(String.format("Creating _SUCCESS under targetBasePath: %s",
outputDir));
- fs.createNewFile(successTagPath);
- }
- }
-
- public static void main(String[] args) throws IOException {
- // Take input configs
- final Config cfg = new Config();
- new JCommander(cfg, null, args);
- LOG.info(String.format("Snapshot hoodie table from %s (source) to %s
(target)", cfg.basePath,
- cfg.outputPath));
-
- // Create a spark job to do the snapshot copy
- SparkConf sparkConf = buildSparkConf("Hoodie-snapshot-copier", "local[*]");
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
- LOG.info("Initializing spark job.");
-
- // Copy
- HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
- copier.snapshot(jsc, cfg.basePath, cfg.outputPath,
cfg.useFileListingFromMetadata);
-
- // Stop the job
- jsc.stop();
- }
-}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
deleted file mode 100644
index de5ea9493e1..00000000000
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities.functional;
-
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineUtils;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.StoragePathInfo;
-import org.apache.hudi.testutils.FunctionalTestHarness;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
-import org.apache.hudi.utilities.HDFSParquetImporter;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.avro.AvroParquetWriter;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
-import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-@Disabled("Disable due to flakiness and feature deprecation.")
-@Tag("functional")
-public class TestHDFSParquetImporter extends FunctionalTestHarness implements
Serializable {
-
- private String basePath;
- private transient StoragePath hoodieFolder;
- private transient Path srcFolder;
- private transient List<GenericRecord> insertData;
-
- @BeforeEach
- public void init() throws IOException, ParseException {
- basePath = (new Path(dfsBasePath(),
Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
-
- // Hoodie root folder.
- hoodieFolder = new StoragePath(basePath, "testTarget");
-
- // Create generic records.
- srcFolder = new Path(basePath, "testSrc");
- insertData = createInsertRecords(srcFolder);
- }
-
- @AfterEach
- public void clean() throws IOException {
- hoodieStorage().deleteDirectory(new StoragePath(basePath));
- }
-
- /**
- * Test successful data import with retries.
- */
- @Test
- public void testImportWithRetries() throws Exception {
- // Create schema file.
- String schemaFile = new Path(basePath, "file.schema").toString();
-
- HDFSParquetImporter.Config cfg =
getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
- "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
- AtomicInteger retry = new AtomicInteger(3);
- AtomicInteger fileCreated = new AtomicInteger(0);
- HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg) {
- @Override
- protected int dataImport(JavaSparkContext jsc) throws IOException {
- int ret = super.dataImport(jsc);
- if (retry.decrementAndGet() == 0) {
- fileCreated.incrementAndGet();
- createSchemaFile(schemaFile);
- }
-
- return ret;
- }
- };
- // Schema file is not created so this operation should fail.
- assertEquals(0, dataImporter.dataImport(jsc(), retry.get()));
- assertEquals(-1, retry.get());
- assertEquals(1, fileCreated.get());
-
- // Check if
- // 1. .commit file is present
- // 2. number of records in each partition == 24
- // 3. total number of partitions == 4;
- boolean isCommitFilePresent = false;
- Map<String, Long> recordCounts = new HashMap<String, Long>();
- List<StoragePathInfo> hoodieFiles =
hoodieStorage().listFiles(hoodieFolder);
- for (StoragePathInfo pathInfo : hoodieFiles) {
- isCommitFilePresent = isCommitFilePresent
- ||
pathInfo.getPath().toString().endsWith(HoodieTimeline.COMMIT_EXTENSION);
-
- if (pathInfo.getPath().toString().endsWith("parquet")) {
- String partitionPath = pathInfo.getPath().getParent().toString();
- long count =
sqlContext().read().parquet(pathInfo.getPath().toString()).count();
- if (!recordCounts.containsKey(partitionPath)) {
- recordCounts.put(partitionPath, 0L);
- }
- recordCounts.put(partitionPath, recordCounts.get(partitionPath) +
count);
- }
- }
- assertTrue(isCommitFilePresent, "commit file is missing");
- assertEquals(4, recordCounts.size(), "partition is missing");
- for (Entry<String, Long> e : recordCounts.entrySet()) {
- assertEquals(24, e.getValue().longValue(), "missing records");
- }
- }
-
- private void insert(JavaSparkContext jsc) throws IOException {
- // Create schema file.
- String schemaFile = new Path(basePath, "file.schema").toString();
- createSchemaFile(schemaFile);
-
- HDFSParquetImporter.Config cfg =
getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
- "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
- HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
-
- dataImporter.dataImport(jsc, 0);
- }
-
- /**
- * Test successful insert and verify data consistency.
- */
- @Test
- public void testImportWithInsert() throws IOException, ParseException {
- insert(jsc());
- Dataset<Row> ds = HoodieClientTestUtils.read(
- jsc(), basePath + "/testTarget", sqlContext(), hoodieStorage(),
- basePath + "/testTarget/*/*/*/*");
-
- List<Row> readData =
- ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat",
"begin_lon", "end_lat",
- "end_lon").collectAsList();
- List<HoodieTripModel> result = readData.stream().map(row ->
- new HoodieTripModel(row.getLong(0), row.getString(1),
row.getString(2), row.getString(3),
- row.getDouble(4),
- row.getDouble(5), row.getDouble(6), row.getDouble(7)))
- .collect(Collectors.toList());
-
- List<HoodieTripModel> expected = insertData.stream().map(g ->
- new HoodieTripModel(Long.parseLong(g.get("timestamp").toString()),
- g.get("_row_key").toString(),
- g.get("rider").toString(),
- g.get("driver").toString(),
- Double.parseDouble(g.get("begin_lat").toString()),
- Double.parseDouble(g.get("begin_lon").toString()),
- Double.parseDouble(g.get("end_lat").toString()),
- Double.parseDouble(g.get("end_lon").toString())))
- .collect(Collectors.toList());
-
- assertTrue(result.containsAll(expected) && expected.containsAll(result) &&
result.size() == expected.size());
- }
-
- /**
- * Test upsert data and verify data consistency.
- */
- @Test
- public void testImportWithUpsert() throws IOException, ParseException {
- insert(jsc());
-
- // Create schema file.
- String schemaFile = new Path(basePath, "file.schema").toString();
-
- Path upsertFolder = new Path(basePath, "testUpsertSrc");
- List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
-
- HDFSParquetImporter.Config cfg =
getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
- "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
- cfg.command = "upsert";
- HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
-
- dataImporter.dataImport(jsc(), 0);
-
- // construct result, remove top 10 and add upsert data.
- List<GenericRecord> expectData = insertData.subList(11, 96);
- expectData.addAll(upsertData);
-
- // read latest data
- Dataset<Row> ds =
- HoodieClientTestUtils.read(jsc(), basePath + "/testTarget",
sqlContext(), hoodieStorage(),
- basePath + "/testTarget/*/*/*/*");
-
- List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver",
"begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
- List<HoodieTripModel> result = readData.stream().map(row ->
- new HoodieTripModel(row.getLong(0), row.getString(1),
row.getString(2), row.getString(3), row.getDouble(4),
- row.getDouble(5), row.getDouble(6), row.getDouble(7)))
- .collect(Collectors.toList());
-
- // get expected result.
- List<HoodieTripModel> expected = expectData.stream().map(g ->
- new HoodieTripModel(Long.parseLong(g.get("timestamp").toString()),
- g.get("_row_key").toString(),
- g.get("rider").toString(),
- g.get("driver").toString(),
- Double.parseDouble(g.get("begin_lat").toString()),
- Double.parseDouble(g.get("begin_lon").toString()),
- Double.parseDouble(g.get("end_lat").toString()),
- Double.parseDouble(g.get("end_lon").toString())))
- .collect(Collectors.toList());
-
- assertTrue(result.containsAll(expected) && expected.containsAll(result) &&
result.size() == expected.size());
- }
-
- public List<GenericRecord> createInsertRecords(Path srcFolder) throws
ParseException, IOException {
- Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
- long startTime =
TimelineUtils.parseDateFromInstantTime("20170203000000").getTime() / 1000;
- List<GenericRecord> records = new ArrayList<GenericRecord>();
- for (long recordNum = 0; recordNum < 96; recordNum++) {
- records.add(new
HoodieTestDataGenerator().generateGenericRecord(Long.toString(recordNum), "0",
"rider-" + recordNum, "driver-" + recordNum, startTime +
TimeUnit.HOURS.toSeconds(recordNum)));
- }
- try (ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(srcFile)
- .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA)
- .withConf(HoodieTestUtils.getDefaultStorageConf().unwrap()).build()) {
- for (GenericRecord record : records) {
- writer.write(record);
- }
- }
- return records;
- }
-
- public List<GenericRecord> createUpsertRecords(Path srcFolder) throws
ParseException, IOException {
- Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
- long startTime =
TimelineUtils.parseDateFromInstantTime("20170203000000").getTime() / 1000;
- List<GenericRecord> records = new ArrayList<GenericRecord>();
- // 10 for update
- HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
- for (long recordNum = 0; recordNum < 11; recordNum++) {
- records.add(
- dataGen.generateGenericRecord(Long.toString(recordNum), "0",
"rider-upsert-" + recordNum,
- "driver-upsert" + recordNum, startTime +
TimeUnit.HOURS.toSeconds(recordNum)));
- }
- // 4 for insert
- for (long recordNum = 96; recordNum < 100; recordNum++) {
- records.add(
- dataGen.generateGenericRecord(Long.toString(recordNum), "0",
"rider-upsert-" + recordNum,
- "driver-upsert" + recordNum, startTime +
TimeUnit.HOURS.toSeconds(recordNum)));
- }
- try (ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(srcFile)
- .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA)
- .withConf(HoodieTestUtils.getDefaultStorageConf().unwrap()).build()) {
- for (GenericRecord record : records) {
- writer.write(record);
- }
- }
- return records;
- }
-
- private void createSchemaFile(String schemaFile) throws IOException {
- OutputStream schemaFileOS = hoodieStorage().create(new
StoragePath(schemaFile));
-
schemaFileOS.write(getUTF8Bytes(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
- schemaFileOS.close();
- }
-
- /**
- * Tests for scheme file. 1. File is missing. 2. File has invalid data.
- */
- @Test
- public void testSchemaFile() throws Exception {
- // Hoodie root folder
- StoragePath hoodieFolder = new StoragePath(basePath, "testTarget");
- StoragePath srcFolder = new StoragePath(basePath.toString(), "srcTest");
- StoragePath schemaFile = new StoragePath(basePath.toString(),
"missingFile.schema");
- HDFSParquetImporter.Config cfg =
- getHDFSParquetImporterConfig(srcFolder.toString(),
hoodieFolder.toString(),
- "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1,
schemaFile.toString());
- HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
- // Should fail - return : -1.
- assertEquals(-1, dataImporter.dataImport(jsc(), 0));
-
- hoodieStorage().create(schemaFile).write(getUTF8Bytes("Random invalid
schema data"));
- // Should fail - return : -1.
- assertEquals(-1, dataImporter.dataImport(jsc(), 0));
- }
-
- /**
- * Test for missing rowKey and partitionKey.
- */
- @Test
- public void testRowAndPartitionKey() throws Exception {
- // Create schema file.
- Path schemaFile = new Path(basePath.toString(), "missingFile.schema");
- createSchemaFile(schemaFile.toString());
-
- HDFSParquetImporter dataImporter;
- HDFSParquetImporter.Config cfg;
-
- // Check for invalid row key.
- cfg = getHDFSParquetImporterConfig(srcFolder.toString(),
hoodieFolder.toString(), "testTable", "COPY_ON_WRITE",
- "invalidRowKey", "timestamp", 1, schemaFile.toString());
- dataImporter = new HDFSParquetImporter(cfg);
- assertEquals(-1, dataImporter.dataImport(jsc(), 0));
-
- // Check for invalid partition key.
- cfg = getHDFSParquetImporterConfig(srcFolder.toString(),
hoodieFolder.toString(), "testTable", "COPY_ON_WRITE",
- "_row_key", "invalidTimeStamp", 1, schemaFile.toString());
- dataImporter = new HDFSParquetImporter(cfg);
- assertEquals(-1, dataImporter.dataImport(jsc(), 0));
- }
-
- public HDFSParquetImporter.Config getHDFSParquetImporterConfig(String
srcPath, String targetPath, String tableName,
- String tableType, String rowKey, String partitionKey, int parallelism,
String schemaFile) {
- HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config();
- cfg.srcPath = srcPath;
- cfg.targetPath = targetPath;
- cfg.tableName = tableName;
- cfg.tableType = tableType;
- cfg.rowKey = rowKey;
- cfg.partitionKey = partitionKey;
- cfg.parallelism = parallelism;
- cfg.schemaFile = schemaFile;
- return cfg;
- }
-
- /**
- * Class used for compare result and expected.
- */
- public static class HoodieTripModel {
-
- long timestamp;
- String rowKey;
- String rider;
- String driver;
- double beginLat;
- double beginLon;
- double endLat;
- double endLon;
-
- public HoodieTripModel(long timestamp, String rowKey, String rider, String
driver, double beginLat,
- double beginLon, double endLat, double endLon) {
- this.timestamp = timestamp;
- this.rowKey = rowKey;
- this.rider = rider;
- this.driver = driver;
- this.beginLat = beginLat;
- this.beginLon = beginLon;
- this.endLat = endLat;
- this.endLon = endLon;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- HoodieTripModel other = (HoodieTripModel) o;
- return timestamp == other.timestamp && rowKey.equals(other.rowKey) &&
rider.equals(other.rider)
- && driver.equals(other.driver) && beginLat == other.beginLat &&
beginLon == other.beginLon
- && endLat == other.endLat && endLon == other.endLon;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(timestamp, rowKey, rider, driver, beginLat,
beginLon, endLat, endLon);
- }
- }
-}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java
deleted file mode 100644
index 19f58fadbe3..00000000000
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities.functional;
-
-import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.testutils.FunctionalTestHarness;
-import org.apache.hudi.utilities.HoodieSnapshotCopier;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-@Disabled("Disable due to flakiness and feature deprecation.")
-@Tag("functional")
-public class TestHoodieSnapshotCopier extends FunctionalTestHarness {
-
- private static final String BASE_FILE_EXTENSION =
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
- private static final String TEST_WRITE_TOKEN = "1-0-1";
-
- private String basePath;
- private String outputPath;
- private FileSystem fs;
-
- @BeforeEach
- public void init() throws IOException {
- // Prepare directories
- String rootPath = "file://" + tempDir.toString();
- basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME;
- outputPath = rootPath + "/output";
-
- final StorageConfiguration<?> storageConf =
HoodieTestUtils.getDefaultStorageConf();
- fs = HadoopFSUtils.getFs(basePath, storageConf);
- HoodieTestUtils.init(storageConf, basePath);
- }
-
- @Test
- public void testEmptySnapshotCopy() throws IOException {
- // There is no real data (only .hoodie directory)
- assertEquals(fs.listStatus(new Path(basePath)).length, 1);
- assertFalse(fs.exists(new Path(outputPath)));
-
- // Do the snapshot
- HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
- copier.snapshot(jsc(), basePath, outputPath,
- HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS);
-
- // Nothing changed; we just bail out
- assertEquals(fs.listStatus(new Path(basePath)).length, 1);
- assertFalse(fs.exists(new Path(outputPath + "/_SUCCESS")));
- }
-
- // TODO - uncomment this after fixing test failures
- // @Test
- public void testSnapshotCopy() throws Exception {
- // Generate some commits and corresponding parquets
- String commitTime1 = "20160501010101";
- String commitTime2 = "20160502020601";
- String commitTime3 = "20160506030611";
- new File(basePath + "/.hoodie").mkdirs();
- new File(basePath + "/.hoodie/hoodie.properties").createNewFile();
- // Only first two have commit files
- new File(basePath + "/.hoodie/timeline/" + commitTime1 +
".commit").createNewFile();
- new File(basePath + "/.hoodie/timeline/" + commitTime2 +
".commit").createNewFile();
- new File(basePath + "/.hoodie/timeline/" + commitTime3 +
".inflight").createNewFile();
-
- // Some parquet files
- new File(basePath + "/2016/05/01/").mkdirs();
- new File(basePath + "/2016/05/02/").mkdirs();
- new File(basePath + "/2016/05/06/").mkdirs();
- HoodieTestDataGenerator.writePartitionMetadataDeprecated(hoodieStorage(),
new String[] {"2016/05/01", "2016/05/02", "2016/05/06"},
- basePath);
- // Make commit1
- File file11 = new File(basePath + "/2016/05/01/" +
FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, "id11",
BASE_FILE_EXTENSION));
- file11.createNewFile();
- File file12 = new File(basePath + "/2016/05/02/" +
FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, "id12",
BASE_FILE_EXTENSION));
- file12.createNewFile();
- File file13 = new File(basePath + "/2016/05/06/" +
FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, "id13",
BASE_FILE_EXTENSION));
- file13.createNewFile();
-
- // Make commit2
- File file21 = new File(basePath + "/2016/05/01/" +
FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN, "id21",
BASE_FILE_EXTENSION));
- file21.createNewFile();
- File file22 = new File(basePath + "/2016/05/02/" +
FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN, "id22",
BASE_FILE_EXTENSION));
- file22.createNewFile();
- File file23 = new File(basePath + "/2016/05/06/" +
FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN, "id23",
BASE_FILE_EXTENSION));
- file23.createNewFile();
-
- // Make commit3
- File file31 = new File(basePath + "/2016/05/01/" +
FSUtils.makeBaseFileName(commitTime3, TEST_WRITE_TOKEN, "id31",
BASE_FILE_EXTENSION));
- file31.createNewFile();
- File file32 = new File(basePath + "/2016/05/02/" +
FSUtils.makeBaseFileName(commitTime3, TEST_WRITE_TOKEN, "id32",
BASE_FILE_EXTENSION));
- file32.createNewFile();
- File file33 = new File(basePath + "/2016/05/06/" +
FSUtils.makeBaseFileName(commitTime3, TEST_WRITE_TOKEN, "id33",
BASE_FILE_EXTENSION));
- file33.createNewFile();
-
- // Do a snapshot copy
- HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
- copier.snapshot(jsc(), basePath, outputPath,
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS);
-
- // Check results
- assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" +
file11.getName())));
- assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" +
file12.getName())));
- assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" +
file13.getName())));
- assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" +
file21.getName())));
- assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" +
file22.getName())));
- assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" +
file23.getName())));
- assertFalse(fs.exists(new Path(outputPath + "/2016/05/01/" +
file31.getName())));
- assertFalse(fs.exists(new Path(outputPath + "/2016/05/02/" +
file32.getName())));
- assertFalse(fs.exists(new Path(outputPath + "/2016/05/06/" +
file33.getName())));
-
- assertTrue(fs.exists(new Path(outputPath + "/.hoodie/timeline/" +
commitTime1 + ".commit")));
- assertTrue(fs.exists(new Path(outputPath + "/.hoodie/timeline/" +
commitTime2 + ".commit")));
- assertFalse(fs.exists(new Path(outputPath + "/.hoodie/timeline/" +
commitTime3 + ".commit")));
- assertFalse(fs.exists(new Path(outputPath + "/.hoodie/timeline/" +
commitTime3 + ".inflight")));
- assertTrue(fs.exists(new Path(outputPath + "/.hoodie/hoodie.properties")));
-
- assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS")));
- }
-}