This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.5.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 8324f6ecb030227beca87d92031cefb08c21ae6e Author: hongdd <[email protected]> AuthorDate: Tue Apr 21 14:21:30 2020 +0800 [HUDI-789]Adjust logic of upsert in HDFSParquetImporter (#1511) --- .../apache/hudi/utilities/HDFSParquetImporter.java | 22 +- .../hudi/utilities/TestHDFSParquetImporter.java | 255 ++++++++++++++++----- 2 files changed, 217 insertions(+), 60 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java index e566a3e..8f2b068 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java @@ -99,6 +99,10 @@ public class HDFSParquetImporter implements Serializable { } + private boolean isUpsert() { + return "upsert".equals(cfg.command.toLowerCase()); + } + public int dataImport(JavaSparkContext jsc, int retry) { this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration()); this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) @@ -107,7 +111,7 @@ public class HDFSParquetImporter implements Serializable { int ret = -1; try { // Verify that targetPath is not present. - if (fs.exists(new Path(cfg.targetPath))) { + if (fs.exists(new Path(cfg.targetPath)) && !isUpsert()) { throw new HoodieIOException(String.format("Make sure %s is not present.", cfg.targetPath)); } do { @@ -121,20 +125,22 @@ public class HDFSParquetImporter implements Serializable { protected int dataImport(JavaSparkContext jsc) throws IOException { try { - if (fs.exists(new Path(cfg.targetPath))) { + if (fs.exists(new Path(cfg.targetPath)) && !isUpsert()) { // cleanup target directory. fs.delete(new Path(cfg.targetPath), true); } + if (!fs.exists(new Path(cfg.targetPath))) { + // Initialize target hoodie table. + Properties properties = new Properties(); + properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName); + properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType); + HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(), cfg.targetPath, properties); + } + // Get schema. String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile); - // Initialize target hoodie table. - Properties properties = new Properties(); - properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName); - properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType); - HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(), cfg.targetPath, properties); - HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr, cfg.parallelism, Option.empty(), props); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java index e4aac06..060e1c6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.HoodieWriteClient; +import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.minicluster.HdfsTestService; import org.apache.hudi.common.model.HoodieTestUtils; @@ -37,8 +38,13 @@ import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; + +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -50,8 +56,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -75,34 +83,43 @@ public class TestHDFSParquetImporter implements Serializable { } @AfterClass - public static void cleanupClass() throws Exception { + public static void cleanupClass() { if (hdfsTestService != null) { hdfsTestService.stop(); } } + private String basePath; + private transient Path hoodieFolder; + private transient Path srcFolder; + private transient List<GenericRecord> insertData; + + @Before + public void init() throws IOException, ParseException { + basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString(); + + // Hoodie root folder. + hoodieFolder = new Path(basePath, "testTarget"); + + // Create generic records. + srcFolder = new Path(basePath, "testSrc"); + insertData = createInsertRecords(srcFolder); + } + + @After + public void clean() throws IOException { + dfs.delete(new Path(basePath), true); + } + /** * Test successful data import with retries. */ @Test public void testImportWithRetries() throws Exception { - JavaSparkContext jsc = null; - try { - jsc = getJavaSparkContext(); - - // Test root folder. - String basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString(); - - // Hoodie root folder - Path hoodieFolder = new Path(basePath, "testTarget"); - + try (JavaSparkContext jsc = getJavaSparkContext()) { // Create schema file. String schemaFile = new Path(basePath, "file.schema").toString(); - // Create generic records. - Path srcFolder = new Path(basePath, "testSrc"); - createRecords(srcFolder); - HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile); AtomicInteger retry = new AtomicInteger(3); @@ -150,14 +167,104 @@ public class TestHDFSParquetImporter implements Serializable { for (Entry<String, Long> e : recordCounts.entrySet()) { assertEquals("missing records", 24, e.getValue().longValue()); } - } finally { - if (jsc != null) { - jsc.stop(); - } } } - private void createRecords(Path srcFolder) throws ParseException, IOException { + private void insert(JavaSparkContext jsc) throws IOException { + // Create schema file. + String schemaFile = new Path(basePath, "file.schema").toString(); + createSchemaFile(schemaFile); + + HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), + "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile); + HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg); + + dataImporter.dataImport(jsc, 0); + } + + /** + * Test successful insert and verify data consistency. + */ + @Test + public void testImportWithInsert() throws IOException, ParseException { + try (JavaSparkContext jsc = getJavaSparkContext()) { + insert(jsc); + SQLContext sqlContext = new SQLContext(jsc); + Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*"); + + List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList(); + List<HoodieTripModel> result = readData.stream().map(row -> + new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4), + row.getDouble(5), row.getDouble(6), row.getDouble(7))) + .collect(Collectors.toList()); + + List<HoodieTripModel> expected = insertData.stream().map(g -> + new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()), + g.get("_row_key").toString(), + g.get("rider").toString(), + g.get("driver").toString(), + Double.parseDouble(g.get("begin_lat").toString()), + Double.parseDouble(g.get("begin_lon").toString()), + Double.parseDouble(g.get("end_lat").toString()), + Double.parseDouble(g.get("end_lon").toString()))) + .collect(Collectors.toList()); + + assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size()); + } + } + + /** + * Test upsert data and verify data consistency. + */ + @Test + public void testImportWithUpsert() throws IOException, ParseException { + try (JavaSparkContext jsc = getJavaSparkContext()) { + insert(jsc); + + // Create schema file. + String schemaFile = new Path(basePath, "file.schema").toString(); + + Path upsertFolder = new Path(basePath, "testUpsertSrc"); + List<GenericRecord> upsertData = createUpsertRecords(upsertFolder); + + HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(), + "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile); + cfg.command = "upsert"; + HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg); + + dataImporter.dataImport(jsc, 0); + + // construct result, remove top 10 and add upsert data. + List<GenericRecord> expectData = insertData.subList(11, 96); + expectData.addAll(upsertData); + + // read latest data + SQLContext sqlContext = new SQLContext(jsc); + Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*"); + + List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList(); + List<HoodieTripModel> result = readData.stream().map(row -> + new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4), + row.getDouble(5), row.getDouble(6), row.getDouble(7))) + .collect(Collectors.toList()); + + // get expected result. + List<HoodieTripModel> expected = expectData.stream().map(g -> + new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()), + g.get("_row_key").toString(), + g.get("rider").toString(), + g.get("driver").toString(), + Double.parseDouble(g.get("begin_lat").toString()), + Double.parseDouble(g.get("begin_lon").toString()), + Double.parseDouble(g.get("end_lat").toString()), + Double.parseDouble(g.get("end_lon").toString()))) + .collect(Collectors.toList()); + + assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size()); + } + } + + private List<GenericRecord> createInsertRecords(Path srcFolder) throws ParseException, IOException { Path srcFile = new Path(srcFolder.toString(), "file1.parquet"); long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000; List<GenericRecord> records = new ArrayList<GenericRecord>(); @@ -165,12 +272,36 @@ public class TestHDFSParquetImporter implements Serializable { records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-" + recordNum, "driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))); } - ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile) - .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build(); - for (GenericRecord record : records) { - writer.write(record); + try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile) + .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) { + for (GenericRecord record : records) { + writer.write(record); + } } - writer.close(); + return records; + } + + private List<GenericRecord> createUpsertRecords(Path srcFolder) throws ParseException, IOException { + Path srcFile = new Path(srcFolder.toString(), "file1.parquet"); + long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000; + List<GenericRecord> records = new ArrayList<GenericRecord>(); + // 10 for update + for (long recordNum = 0; recordNum < 11; recordNum++) { + records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-upsert-" + recordNum, + "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))); + } + // 4 for insert + for (long recordNum = 96; recordNum < 100; recordNum++) { + records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-upsert-" + recordNum, + "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))); + } + try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile) + .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) { + for (GenericRecord record : records) { + writer.write(record); + } + } + return records; } private void createSchemaFile(String schemaFile) throws IOException { @@ -184,12 +315,7 @@ public class TestHDFSParquetImporter implements Serializable { */ @Test public void testSchemaFile() throws Exception { - JavaSparkContext jsc = null; - try { - jsc = getJavaSparkContext(); - - // Test root folder. - String basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString(); + try (JavaSparkContext jsc = getJavaSparkContext()) { // Hoodie root folder Path hoodieFolder = new Path(basePath, "testTarget"); Path srcFolder = new Path(basePath.toString(), "srcTest"); @@ -204,10 +330,6 @@ public class TestHDFSParquetImporter implements Serializable { // Should fail - return : -1. assertEquals(-1, dataImporter.dataImport(jsc, 0)); - } finally { - if (jsc != null) { - jsc.stop(); - } } } @@ -216,19 +338,7 @@ public class TestHDFSParquetImporter implements Serializable { */ @Test public void testRowAndPartitionKey() throws Exception { - JavaSparkContext jsc = null; - try { - jsc = getJavaSparkContext(); - - // Test root folder. - String basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString(); - // Hoodie root folder - Path hoodieFolder = new Path(basePath, "testTarget"); - - // Create generic records. - Path srcFolder = new Path(basePath, "testSrc"); - createRecords(srcFolder); - + try (JavaSparkContext jsc = getJavaSparkContext()) { // Create schema file. Path schemaFile = new Path(basePath.toString(), "missingFile.schema"); createSchemaFile(schemaFile.toString()); @@ -248,10 +358,6 @@ public class TestHDFSParquetImporter implements Serializable { dataImporter = new HDFSParquetImporter(cfg); assertEquals(-1, dataImporter.dataImport(jsc, 0)); - } finally { - if (jsc != null) { - jsc.stop(); - } } } @@ -275,4 +381,49 @@ public class TestHDFSParquetImporter implements Serializable { sparkConf = HoodieWriteClient.registerClasses(sparkConf); return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf)); } + + /** + * Class used for compare result and expected. + */ + public static class HoodieTripModel { + double timestamp; + String rowKey; + String rider; + String driver; + double beginLat; + double beginLon; + double endLat; + double endLon; + + private HoodieTripModel(double timestamp, String rowKey, String rider, String driver, double beginLat, + double beginLon, double endLat, double endLon) { + this.timestamp = timestamp; + this.rowKey = rowKey; + this.rider = rider; + this.driver = driver; + this.beginLat = beginLat; + this.beginLon = beginLon; + this.endLat = endLat; + this.endLon = endLon; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HoodieTripModel other = (HoodieTripModel) o; + return timestamp == other.timestamp && rowKey.equals(other.rowKey) && rider.equals(other.rider) + && driver.equals(other.driver) && beginLat == other.beginLat && beginLon == other.beginLon + && endLat == other.endLat && endLon == other.endLon; + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, rowKey, rider, driver, beginLat, beginLon, endLat, endLon); + } + } }
