hddong commented on a change in pull request #1511: [HUDI-789]Adjust logic of
upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r410005068
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws
IOException {
}
}
- private void createRecords(Path srcFolder) throws ParseException,
IOException {
+ private void insert(JavaSparkContext jsc) throws IOException, ParseException
{
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+ createSchemaFile(schemaFile);
+
+ HDFSParquetImporter.Config cfg =
getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+ }
+
+ /**
+ * Test successful insert and verify data consistency.
+ */
+ @Test
+ public void testImportInsert() throws IOException, ParseException {
+ JavaSparkContext jsc = null;
+ try {
+ jsc = getJavaSparkContext();
+ insert(jsc);
+ SQLContext sqlContext = new SQLContext(jsc);
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath +
"/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider",
"driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieModel> result = readData.stream().map(row ->
+ new HoodieModel(row.getDouble(0), row.getString(1),
row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ List<HoodieModel> expected = insertData.stream().map(g ->
+ new HoodieModel(Double.valueOf(g.get("timestamp").toString()),
g.get("_row_key").toString(), g.get("rider").toString(),
g.get("driver").toString(),
+ Double.valueOf(g.get("begin_lat").toString()),
Double.valueOf(g.get("begin_lon").toString()),
Double.valueOf(g.get("end_lat").toString()),
+ Double.valueOf(g.get("end_lon").toString())))
+ .collect(Collectors.toList());
+
+ assertTrue(result.containsAll(expected) && expected.containsAll(result)
&& result.size() == expected.size());
+ } finally {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ }
+ }
+
+ /**
+ * Test upsert data and verify data consistency.
+ */
+ @Test
+ public void testImportWithUpsert() throws IOException, ParseException {
+ JavaSparkContext jsc = null;
+ try {
+ jsc = getJavaSparkContext();
+ insert(jsc);
+
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+
+ Path upsertFolder = new Path(basePath, "testUpsertSrc");
+ List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
+
+ HDFSParquetImporter.Config cfg =
getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1,
schemaFile);
+ cfg.command = "upsert";
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+
+ // construct result, remove top 10 and add upsert data.
+ List<GenericRecord> expectData = insertData.subList(11, 96);
+ expectData.addAll(upsertData);
+
+ // read latest data
+ SQLContext sqlContext = new SQLContext(jsc);
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath +
"/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider",
"driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieModel> result = readData.stream().map(row ->
+ new HoodieModel(row.getDouble(0), row.getString(1),
row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ // get expected result
+ List<HoodieModel> expected = expectData.stream().map(g ->
+ new HoodieModel(Double.valueOf(g.get("timestamp").toString()),
g.get("_row_key").toString(), g.get("rider").toString(),
g.get("driver").toString(),
+ Double.valueOf(g.get("begin_lat").toString()),
Double.valueOf(g.get("begin_lon").toString()),
Double.valueOf(g.get("end_lat").toString()),
+ Double.valueOf(g.get("end_lon").toString())))
+ .collect(Collectors.toList());
+
+ assertTrue(result.containsAll(expected) && expected.containsAll(result)
&& result.size() == expected.size());
+ } finally {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ }
+ }
+
+ private List<GenericRecord> createRecords(Path srcFolder) throws
ParseException, IOException {
Review comment:
> Does this method use to init records for inserting? IMO, we should
distinguish it with upsert.
Yes, it is for inserting only, `upsert` has it's own method.
https://github.com/apache/incubator-hudi/blob/e1a47ff32f900d9c723dc907784210ce756915f9/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java#L284-L286
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services