yanghua commented on a change in pull request #1511:
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r411053511
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -275,4 +380,70 @@ private JavaSparkContext getJavaSparkContext() {
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
}
+
+ /**
+ * Class used for compare result and expected.
+ */
+ private class HoodieModel {
Review comment:
Can we mark this class to be a `static` class and rename it to
`HoodieTripModel`?
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -275,4 +380,70 @@ private JavaSparkContext getJavaSparkContext() {
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
}
+
+ /**
+ * Class used for compare result and expected.
+ */
+ private class HoodieModel {
+ double timestamp;
+ String rowKey;
+ String rider;
+ String driver;
+ double beginLat;
+ double beginLon;
+ double endLat;
+ double endLon;
+
+ private HoodieModel(double timestamp, String rowKey, String rider, String
driver, double beginLat,
+ double beginLon, double endLat, double endLon) {
+ this.timestamp = timestamp;
+ this.rowKey = rowKey;
+ this.rider = rider;
+ this.driver = driver;
+ this.beginLat = beginLat;
+ this.beginLon = beginLon;
+ this.endLat = endLat;
+ this.endLon = endLon;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HoodieModel other = (HoodieModel) o;
+ return timestamp == other.timestamp && rowKey.equals(other.rowKey) &&
rider.equals(other.rider)
+ && driver.equals(other.driver) && beginLat == other.beginLat &&
beginLon == other.beginLon
+ && endLat == other.endLat && endLon == other.endLon;
+ }
+
+ @Override
+ public int hashCode() {
Review comment:
It's a bit long. Here, I would suggest using `Objects.hash(...)`.
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws
IOException {
}
}
- private void createRecords(Path srcFolder) throws ParseException,
IOException {
+ private void insert(JavaSparkContext jsc) throws IOException, ParseException
{
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+ createSchemaFile(schemaFile);
+
+ HDFSParquetImporter.Config cfg =
getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+ }
+
+ /**
+ * Test successful insert and verify data consistency.
+ */
+ @Test
+ public void testImportInsert() throws IOException, ParseException {
+ JavaSparkContext jsc = null;
+ try {
+ jsc = getJavaSparkContext();
+ insert(jsc);
+ SQLContext sqlContext = new SQLContext(jsc);
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath +
"/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider",
"driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieModel> result = readData.stream().map(row ->
+ new HoodieModel(row.getDouble(0), row.getString(1),
row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ List<HoodieModel> expected = insertData.stream().map(g ->
+ new HoodieModel(Double.valueOf(g.get("timestamp").toString()),
g.get("_row_key").toString(), g.get("rider").toString(),
g.get("driver").toString(),
+ Double.valueOf(g.get("begin_lat").toString()),
Double.valueOf(g.get("begin_lon").toString()),
Double.valueOf(g.get("end_lat").toString()),
+ Double.valueOf(g.get("end_lon").toString())))
+ .collect(Collectors.toList());
+
+ assertTrue(result.containsAll(expected) && expected.containsAll(result)
&& result.size() == expected.size());
+ } finally {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ }
+ }
+
+ /**
+ * Test upsert data and verify data consistency.
+ */
+ @Test
+ public void testImportWithUpsert() throws IOException, ParseException {
+ JavaSparkContext jsc = null;
+ try {
+ jsc = getJavaSparkContext();
+ insert(jsc);
+
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+
+ Path upsertFolder = new Path(basePath, "testUpsertSrc");
+ List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
+
+ HDFSParquetImporter.Config cfg =
getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1,
schemaFile);
+ cfg.command = "upsert";
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+
+ // construct result, remove top 10 and add upsert data.
+ List<GenericRecord> expectData = insertData.subList(11, 96);
+ expectData.addAll(upsertData);
+
+ // read latest data
+ SQLContext sqlContext = new SQLContext(jsc);
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath +
"/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider",
"driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieModel> result = readData.stream().map(row ->
+ new HoodieModel(row.getDouble(0), row.getString(1),
row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ // get expected result
+ List<HoodieModel> expected = expectData.stream().map(g ->
+ new HoodieModel(Double.valueOf(g.get("timestamp").toString()),
g.get("_row_key").toString(), g.get("rider").toString(),
g.get("driver").toString(),
+ Double.valueOf(g.get("begin_lat").toString()),
Double.valueOf(g.get("begin_lon").toString()),
Double.valueOf(g.get("end_lat").toString()),
+ Double.valueOf(g.get("end_lon").toString())))
+ .collect(Collectors.toList());
+
+ assertTrue(result.containsAll(expected) && expected.containsAll(result)
&& result.size() == expected.size());
+ } finally {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ }
+ }
+
+ private List<GenericRecord> createRecords(Path srcFolder) throws
ParseException, IOException {
Review comment:
I mean for matching purpose, can we rename it to `createInsertRecords`?
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -171,6 +277,30 @@ private void createRecords(Path srcFolder) throws
ParseException, IOException {
writer.write(record);
}
writer.close();
+ return records;
+ }
+
+ private List<GenericRecord> createUpsertRecords(Path srcFolder) throws
ParseException, IOException {
+ Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
+ long startTime =
HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
+ List<GenericRecord> records = new ArrayList<GenericRecord>();
+ // 10 for update
+ for (long recordNum = 0; recordNum < 11; recordNum++) {
+
records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum),
"rider-upsert-" + recordNum,
+ "driver-upsert" + recordNum, startTime +
TimeUnit.HOURS.toSeconds(recordNum)));
+ }
+ // 4 for insert
+ for (long recordNum = 96; recordNum < 100; recordNum++) {
+
records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum),
"rider-upsert-" + recordNum,
+ "driver-upsert" + recordNum, startTime +
TimeUnit.HOURS.toSeconds(recordNum)));
+ }
+ ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(srcFile)
+
.withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build();
+ for (GenericRecord record : records) {
+ writer.write(record);
Review comment:
This method throws `IOException`, so the `.close` method should be
wrapped into a `finally` or `try-with-resource` block. The same issue exists in
`createRecords` method.
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -150,14 +166,104 @@ protected int dataImport(JavaSparkContext jsc) throws
IOException {
for (Entry<String, Long> e : recordCounts.entrySet()) {
assertEquals("missing records", 24, e.getValue().longValue());
}
- } finally {
- if (jsc != null) {
- jsc.stop();
- }
}
}
- private void createRecords(Path srcFolder) throws ParseException,
IOException {
+ private void insert(JavaSparkContext jsc) throws IOException {
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+ createSchemaFile(schemaFile);
+
+ HDFSParquetImporter.Config cfg =
getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+ }
+
+ /**
+ * Test successful insert and verify data consistency.
+ */
+ @Test
+ public void testImportInsert() throws IOException, ParseException {
Review comment:
rename to `testImportWithInsert`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]