xushiyan commented on a change in pull request #2127:
URL: https://github.com/apache/hudi/pull/2127#discussion_r499304034
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
##########
@@ -68,77 +71,232 @@ public void tearDown() throws IOException {
cleanupResources();
}
- @Test
- public void testSchemaEvolutionOnUpdate() throws Exception {
+ private WriteStatus prepareFirstRecordCommit(List<String> recordsStrs)
throws IOException {
// Create a bunch of records with a old version of schema
final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleSchema.txt");
final HoodieSparkTable table = HoodieSparkTable.create(config, context);
-
final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x
-> {
- String recordStr1 =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
- + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
- String recordStr2 =
"{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
- + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
- String recordStr3 =
"{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
- + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
List<HoodieRecord> insertRecords = new ArrayList<>();
- RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
- insertRecords
- .add(new HoodieRecord(new HoodieKey(rowChange1.getRowKey(),
rowChange1.getPartitionPath()), rowChange1));
- RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
- insertRecords
- .add(new HoodieRecord(new HoodieKey(rowChange2.getRowKey(),
rowChange2.getPartitionPath()), rowChange2));
- RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3);
- insertRecords
- .add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(),
rowChange3.getPartitionPath()), rowChange3));
-
+ for (String recordStr : recordsStrs) {
+ RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+ insertRecords
+ .add(new HoodieRecord(new HoodieKey(rowChange.getRowKey(),
rowChange.getPartitionPath()), rowChange));
+ }
Map<String, HoodieRecord> insertRecordMap = insertRecords.stream()
.collect(Collectors.toMap(r -> r.getRecordKey(),
Function.identity()));
HoodieCreateHandle createHandle =
- new HoodieCreateHandle(config, "100", table,
rowChange1.getPartitionPath(), "f1-0", insertRecordMap, supplier);
+ new HoodieCreateHandle(config, "100", table,
insertRecords.get(0).getPartitionPath(), "f1-0", insertRecordMap, supplier);
createHandle.write();
return createHandle.close();
}).collect();
final Path commitFile = new Path(config.getBasePath() + "/.hoodie/" +
HoodieTimeline.makeCommitFileName("100"));
FSUtils.getFs(basePath,
HoodieTestUtils.getDefaultHadoopConf()).create(commitFile);
+ return statuses.get(0);
+ }
- // Now try an update with an evolved schema
- // Evolved schema does not have guarantee on preserving the original field
ordering
- final HoodieWriteConfig config2 =
makeHoodieClientConfig("/exampleEvolvedSchema.txt");
- final WriteStatus insertResult = statuses.get(0);
+ private List<String> generateMultiRecordsForExampleSchema() {
+ List<String> recordsStrs = new ArrayList<>();
+ String recordStr1 =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
+ String recordStr2 =
"{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
+ String recordStr3 =
"{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
+ recordsStrs.add(recordStr1);
+ recordsStrs.add(recordStr2);
+ recordsStrs.add(recordStr3);
+ return recordsStrs;
+ }
+
+ private List<String> generateOneRecordForExampleSchema() {
+ List<String> recordsStrs = new ArrayList<>();
+ String recordStr =
"{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
+ recordsStrs.add(recordStr);
+ return recordsStrs;
+ }
+
+ @Test
+ public void testSchemaEvolutionOnUpdateSuccessWithAddColumnHaveDefault()
throws Exception {
+ final WriteStatus insertResult =
prepareFirstRecordCommit(generateMultiRecordsForExampleSchema());
String fileId = insertResult.getFileId();
- final HoodieSparkTable table2 = HoodieSparkTable.create(config, context);
- assertEquals(1, jsc.parallelize(Arrays.asList(1)).map(x -> {
+ // Now try an update with an evolved schema
+ // Evolved schema does not have guarantee on preserving the original field
ordering
+ final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleEvolvedSchema.txt");
+ final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
// New content with values for the newly added field
- String recordStr1 =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ String recordStr =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+
"\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}";
List<HoodieRecord> updateRecords = new ArrayList<>();
- RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
- HoodieRecord record1 =
- new HoodieRecord(new HoodieKey(rowChange1.getRowKey(),
rowChange1.getPartitionPath()), rowChange1);
- record1.unseal();
- record1.setCurrentLocation(new HoodieRecordLocation("100", fileId));
- record1.seal();
- updateRecords.add(record1);
-
+ RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+ HoodieRecord record =
+ new HoodieRecord(new HoodieKey(rowChange.getRowKey(),
rowChange.getPartitionPath()), rowChange);
+ record.unseal();
+ record.setCurrentLocation(new HoodieRecordLocation("101", fileId));
+ record.seal();
+ updateRecords.add(record);
assertDoesNotThrow(() -> {
- HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config2, "101",
table2,
- updateRecords.iterator(), record1.getPartitionPath(), fileId,
supplier);
+ HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, "101",
table,
+ updateRecords.iterator(), record.getPartitionPath(), fileId,
supplier);
Configuration conf = new Configuration();
AvroReadSupport.setAvroReadSchema(conf,
mergeHandle.getWriterSchemaWithMetafields());
List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
- new Path(config2.getBasePath() + "/" +
insertResult.getStat().getPath()));
+ new Path(config.getBasePath() + "/" +
insertResult.getStat().getPath()));
for (GenericRecord rec : oldRecords) {
mergeHandle.write(rec);
}
mergeHandle.close();
}, "UpdateFunction could not read records written with exampleSchema.txt
using the "
+ "exampleEvolvedSchema.txt");
+ return 1;
+ }).collect();
+ }
+
+ @Test
+ public void testSchemaEvolutionOnUpdateSuccessWithChangeColumnOrder() throws
Exception {
+ final WriteStatus insertResult =
prepareFirstRecordCommit(generateMultiRecordsForExampleSchema());
+ String fileId = insertResult.getFileId();
+ // Now try an update with an evolved schema
+ // Evolved schema does not have guarantee on preserving the original field
ordering
+ final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleEvolvedSchemaChangeOrder.txt");
+ final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
+ // New content with values for the newly added field
+ String recordStr =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ +
"\"time\":\"2016-01-31T03:16:41.415Z\",\"added_field\":1},\"number\":12";
+ List<HoodieRecord> updateRecords = new ArrayList<>();
+ RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+ HoodieRecord record =
+ new HoodieRecord(new HoodieKey(rowChange.getRowKey(),
rowChange.getPartitionPath()), rowChange);
+ record.unseal();
+ record.setCurrentLocation(new HoodieRecordLocation("101", fileId));
+ record.seal();
+ updateRecords.add(record);
+ assertDoesNotThrow(() -> {
+ HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, "101",
table,
+ updateRecords.iterator(), record.getPartitionPath(), fileId,
supplier);
+ Configuration conf = new Configuration();
+ AvroReadSupport.setAvroReadSchema(conf,
mergeHandle.getWriterSchemaWithMetafields());
+ List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
+ new Path(config.getBasePath() + "/" +
insertResult.getStat().getPath()));
+ for (GenericRecord rec : oldRecords) {
+ mergeHandle.write(rec);
+ }
+ mergeHandle.close();
+ }, "UpdateFunction could not read records written with exampleSchema.txt
using the "
+ + "exampleEvolvedSchemaChangeOrder.txt as column order change");
return 1;
- }).collect().size());
+ }).collect();
+ }
+
+ @Test
+ public void testSchemaEvolutionOnUpdateMisMatchWithDeleteColumn() throws
Exception {
+ final WriteStatus insertResult =
prepareFirstRecordCommit(generateOneRecordForExampleSchema());
+ String fileId = insertResult.getFileId();
+
+ // Now try an update with an evolved schema
+ // Evolved schema does not have guarantee on preserving the original field
ordering
+ final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleEvolvedSchemaDeleteColumn.txt");
+ final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
+ // New content with values for the newly added field
+ String recordStr =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:16:41.415Z\"}";
+ List<HoodieRecord> updateRecords = new ArrayList<>();
+ RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+ HoodieRecord record =
+ new HoodieRecord(new HoodieKey(rowChange.getRowKey(),
rowChange.getPartitionPath()), rowChange);
+ record.unseal();
+ record.setCurrentLocation(new HoodieRecordLocation("101", fileId));
+ record.seal();
+ updateRecords.add(record);
+ HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, "101",
table,
+ updateRecords.iterator(), record.getPartitionPath(), fileId,
supplier);
+ Configuration conf = new Configuration();
+ AvroReadSupport.setAvroReadSchema(conf,
mergeHandle.getWriterSchemaWithMetafields());
+ assertThrows(InvalidRecordException.class, () -> {
+ List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
+ new Path(config.getBasePath() + "/" +
insertResult.getStat().getPath()));
+ }, "UpdateFunction when delete column ,Parquet/Avro schema mismatch:
Avro field 'xxx' not found");
+ mergeHandle.close();
+ return 1;
+ }).collect();
+ }
+
+ @Test
+ public void testSchemaEvolutionOnUpdateMisMatchWithAddColumnNotHaveDefault()
throws Exception {
+ final WriteStatus insertResult =
prepareFirstRecordCommit(generateOneRecordForExampleSchema());
+ String fileId = insertResult.getFileId();
+
+ // Now try an update with an evolved schema
+ // Evolved schema does not have guarantee on preserving the original field
ordering
+ final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleEvolvedSchemaColumnRequire.txt");
+ final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
+ // New content with values for the newly added field
+ String recordStr =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ +
"\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}";
+ List<HoodieRecord> updateRecords = new ArrayList<>();
+ RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+ HoodieRecord record =
+ new HoodieRecord(new HoodieKey(rowChange.getRowKey(),
rowChange.getPartitionPath()), rowChange);
+ record.unseal();
+ record.setCurrentLocation(new HoodieRecordLocation("101", fileId));
+ record.seal();
+ updateRecords.add(record);
+ assertThrows(HoodieUpsertException.class, () -> {
+ HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, "101",
table,
+ updateRecords.iterator(), record.getPartitionPath(), fileId,
supplier);
+ Configuration conf = new Configuration();
+ AvroReadSupport.setAvroReadSchema(conf,
mergeHandle.getWriterSchemaWithMetafields());
+ List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
+ new Path(config.getBasePath() + "/" +
insertResult.getStat().getPath()));
+ for (GenericRecord rec : oldRecords) {
+ mergeHandle.write(rec);
+ }
+ mergeHandle.close();
+ }, "UpdateFunction could not read records written with exampleSchema.txt
using the "
+ + "exampleEvolvedSchemaColumnRequire.txt ,because oldrecords do not
have required column added_field");
+ return 1;
+ }).collect();
+ }
+
+ @Test
+ public void testSchemaEvolutionOnUpdateMisMatchWithChangeColumnType() throws
Exception {
+ final WriteStatus insertResult =
prepareFirstRecordCommit(generateOneRecordForExampleSchema());
+ String fileId = insertResult.getFileId();
+
+ // Now try an update with an evolved schema
+ // Evolved schema does not have guarantee on preserving the original field
ordering
+ final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleEvolvedSchemaColumnType.txt");
+ final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
Review comment:
```suggestion
jsc.parallelize(Arrays.asList(1)).foreach(x -> {
```
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
##########
@@ -68,77 +71,232 @@ public void tearDown() throws IOException {
cleanupResources();
}
- @Test
- public void testSchemaEvolutionOnUpdate() throws Exception {
+ private WriteStatus prepareFirstRecordCommit(List<String> recordsStrs)
throws IOException {
// Create a bunch of records with a old version of schema
final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleSchema.txt");
final HoodieSparkTable table = HoodieSparkTable.create(config, context);
-
final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x
-> {
- String recordStr1 =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
- + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
- String recordStr2 =
"{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
- + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
- String recordStr3 =
"{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
- + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
List<HoodieRecord> insertRecords = new ArrayList<>();
- RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
- insertRecords
- .add(new HoodieRecord(new HoodieKey(rowChange1.getRowKey(),
rowChange1.getPartitionPath()), rowChange1));
- RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
- insertRecords
- .add(new HoodieRecord(new HoodieKey(rowChange2.getRowKey(),
rowChange2.getPartitionPath()), rowChange2));
- RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3);
- insertRecords
- .add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(),
rowChange3.getPartitionPath()), rowChange3));
-
+ for (String recordStr : recordsStrs) {
+ RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+ insertRecords
+ .add(new HoodieRecord(new HoodieKey(rowChange.getRowKey(),
rowChange.getPartitionPath()), rowChange));
+ }
Map<String, HoodieRecord> insertRecordMap = insertRecords.stream()
.collect(Collectors.toMap(r -> r.getRecordKey(),
Function.identity()));
HoodieCreateHandle createHandle =
- new HoodieCreateHandle(config, "100", table,
rowChange1.getPartitionPath(), "f1-0", insertRecordMap, supplier);
+ new HoodieCreateHandle(config, "100", table,
insertRecords.get(0).getPartitionPath(), "f1-0", insertRecordMap, supplier);
createHandle.write();
return createHandle.close();
}).collect();
final Path commitFile = new Path(config.getBasePath() + "/.hoodie/" +
HoodieTimeline.makeCommitFileName("100"));
FSUtils.getFs(basePath,
HoodieTestUtils.getDefaultHadoopConf()).create(commitFile);
+ return statuses.get(0);
+ }
- // Now try an update with an evolved schema
- // Evolved schema does not have guarantee on preserving the original field
ordering
- final HoodieWriteConfig config2 =
makeHoodieClientConfig("/exampleEvolvedSchema.txt");
- final WriteStatus insertResult = statuses.get(0);
+ private List<String> generateMultiRecordsForExampleSchema() {
+ List<String> recordsStrs = new ArrayList<>();
+ String recordStr1 =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
+ String recordStr2 =
"{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
+ String recordStr3 =
"{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
+ recordsStrs.add(recordStr1);
+ recordsStrs.add(recordStr2);
+ recordsStrs.add(recordStr3);
+ return recordsStrs;
+ }
+
+ private List<String> generateOneRecordForExampleSchema() {
+ List<String> recordsStrs = new ArrayList<>();
+ String recordStr =
"{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
+ recordsStrs.add(recordStr);
+ return recordsStrs;
+ }
+
+ @Test
+ public void testSchemaEvolutionOnUpdateSuccessWithAddColumnHaveDefault()
throws Exception {
+ final WriteStatus insertResult =
prepareFirstRecordCommit(generateMultiRecordsForExampleSchema());
String fileId = insertResult.getFileId();
- final HoodieSparkTable table2 = HoodieSparkTable.create(config, context);
- assertEquals(1, jsc.parallelize(Arrays.asList(1)).map(x -> {
+ // Now try an update with an evolved schema
+ // Evolved schema does not have guarantee on preserving the original field
ordering
+ final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleEvolvedSchema.txt");
+ final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
// New content with values for the newly added field
- String recordStr1 =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ String recordStr =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+
"\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}";
List<HoodieRecord> updateRecords = new ArrayList<>();
- RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
- HoodieRecord record1 =
- new HoodieRecord(new HoodieKey(rowChange1.getRowKey(),
rowChange1.getPartitionPath()), rowChange1);
- record1.unseal();
- record1.setCurrentLocation(new HoodieRecordLocation("100", fileId));
- record1.seal();
- updateRecords.add(record1);
-
+ RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+ HoodieRecord record =
+ new HoodieRecord(new HoodieKey(rowChange.getRowKey(),
rowChange.getPartitionPath()), rowChange);
+ record.unseal();
+ record.setCurrentLocation(new HoodieRecordLocation("101", fileId));
+ record.seal();
+ updateRecords.add(record);
assertDoesNotThrow(() -> {
- HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config2, "101",
table2,
- updateRecords.iterator(), record1.getPartitionPath(), fileId,
supplier);
+ HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, "101",
table,
+ updateRecords.iterator(), record.getPartitionPath(), fileId,
supplier);
Configuration conf = new Configuration();
AvroReadSupport.setAvroReadSchema(conf,
mergeHandle.getWriterSchemaWithMetafields());
List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
- new Path(config2.getBasePath() + "/" +
insertResult.getStat().getPath()));
+ new Path(config.getBasePath() + "/" +
insertResult.getStat().getPath()));
for (GenericRecord rec : oldRecords) {
mergeHandle.write(rec);
}
mergeHandle.close();
}, "UpdateFunction could not read records written with exampleSchema.txt
using the "
+ "exampleEvolvedSchema.txt");
+ return 1;
+ }).collect();
+ }
+
+ @Test
+ public void testSchemaEvolutionOnUpdateSuccessWithChangeColumnOrder() throws
Exception {
+ final WriteStatus insertResult =
prepareFirstRecordCommit(generateMultiRecordsForExampleSchema());
+ String fileId = insertResult.getFileId();
+ // Now try an update with an evolved schema
+ // Evolved schema does not have guarantee on preserving the original field
ordering
+ final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleEvolvedSchemaChangeOrder.txt");
+ final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
+ // New content with values for the newly added field
+ String recordStr =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ +
"\"time\":\"2016-01-31T03:16:41.415Z\",\"added_field\":1},\"number\":12";
+ List<HoodieRecord> updateRecords = new ArrayList<>();
+ RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+ HoodieRecord record =
+ new HoodieRecord(new HoodieKey(rowChange.getRowKey(),
rowChange.getPartitionPath()), rowChange);
+ record.unseal();
+ record.setCurrentLocation(new HoodieRecordLocation("101", fileId));
+ record.seal();
+ updateRecords.add(record);
+ assertDoesNotThrow(() -> {
+ HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, "101",
table,
+ updateRecords.iterator(), record.getPartitionPath(), fileId,
supplier);
+ Configuration conf = new Configuration();
+ AvroReadSupport.setAvroReadSchema(conf,
mergeHandle.getWriterSchemaWithMetafields());
+ List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
+ new Path(config.getBasePath() + "/" +
insertResult.getStat().getPath()));
+ for (GenericRecord rec : oldRecords) {
+ mergeHandle.write(rec);
+ }
+ mergeHandle.close();
+ }, "UpdateFunction could not read records written with exampleSchema.txt
using the "
+ + "exampleEvolvedSchemaChangeOrder.txt as column order change");
return 1;
- }).collect().size());
+ }).collect();
+ }
+
+ @Test
+ public void testSchemaEvolutionOnUpdateMisMatchWithDeleteColumn() throws
Exception {
+ final WriteStatus insertResult =
prepareFirstRecordCommit(generateOneRecordForExampleSchema());
+ String fileId = insertResult.getFileId();
+
+ // Now try an update with an evolved schema
+ // Evolved schema does not have guarantee on preserving the original field
ordering
+ final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleEvolvedSchemaDeleteColumn.txt");
+ final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
+ // New content with values for the newly added field
+ String recordStr =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:16:41.415Z\"}";
+ List<HoodieRecord> updateRecords = new ArrayList<>();
+ RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+ HoodieRecord record =
+ new HoodieRecord(new HoodieKey(rowChange.getRowKey(),
rowChange.getPartitionPath()), rowChange);
+ record.unseal();
+ record.setCurrentLocation(new HoodieRecordLocation("101", fileId));
+ record.seal();
+ updateRecords.add(record);
+ HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, "101",
table,
+ updateRecords.iterator(), record.getPartitionPath(), fileId,
supplier);
+ Configuration conf = new Configuration();
+ AvroReadSupport.setAvroReadSchema(conf,
mergeHandle.getWriterSchemaWithMetafields());
+ assertThrows(InvalidRecordException.class, () -> {
+ List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
+ new Path(config.getBasePath() + "/" +
insertResult.getStat().getPath()));
+ }, "UpdateFunction when delete column ,Parquet/Avro schema mismatch:
Avro field 'xxx' not found");
+ mergeHandle.close();
+ return 1;
+ }).collect();
+ }
+
+ @Test
+ public void testSchemaEvolutionOnUpdateMisMatchWithAddColumnNotHaveDefault()
throws Exception {
+ final WriteStatus insertResult =
prepareFirstRecordCommit(generateOneRecordForExampleSchema());
+ String fileId = insertResult.getFileId();
+
+ // Now try an update with an evolved schema
+ // Evolved schema does not have guarantee on preserving the original field
ordering
+ final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleEvolvedSchemaColumnRequire.txt");
+ final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
+ // New content with values for the newly added field
+ String recordStr =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ +
"\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}";
+ List<HoodieRecord> updateRecords = new ArrayList<>();
+ RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+ HoodieRecord record =
+ new HoodieRecord(new HoodieKey(rowChange.getRowKey(),
rowChange.getPartitionPath()), rowChange);
+ record.unseal();
+ record.setCurrentLocation(new HoodieRecordLocation("101", fileId));
+ record.seal();
+ updateRecords.add(record);
+ assertThrows(HoodieUpsertException.class, () -> {
+ HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, "101",
table,
+ updateRecords.iterator(), record.getPartitionPath(), fileId,
supplier);
+ Configuration conf = new Configuration();
+ AvroReadSupport.setAvroReadSchema(conf,
mergeHandle.getWriterSchemaWithMetafields());
+ List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
+ new Path(config.getBasePath() + "/" +
insertResult.getStat().getPath()));
+ for (GenericRecord rec : oldRecords) {
+ mergeHandle.write(rec);
+ }
+ mergeHandle.close();
+ }, "UpdateFunction could not read records written with exampleSchema.txt
using the "
+ + "exampleEvolvedSchemaColumnRequire.txt ,because oldrecords do not
have required column added_field");
+ return 1;
+ }).collect();
+ }
+
+ @Test
+ public void testSchemaEvolutionOnUpdateMisMatchWithChangeColumnType() throws
Exception {
+ final WriteStatus insertResult =
prepareFirstRecordCommit(generateOneRecordForExampleSchema());
+ String fileId = insertResult.getFileId();
+
+ // Now try an update with an evolved schema
+ // Evolved schema does not have guarantee on preserving the original field
ordering
+ final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleEvolvedSchemaColumnType.txt");
+ final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
+ // New content with values for the newly added field
+ String recordStr =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":\"12\"}";
+ List<HoodieRecord> updateRecords = new ArrayList<>();
+ RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+ HoodieRecord record =
+ new HoodieRecord(new HoodieKey(rowChange.getRowKey(),
rowChange.getPartitionPath()), rowChange);
+ record.unseal();
Review comment:
Default sealed is false. we could skip unsealing.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
##########
@@ -68,77 +71,232 @@ public void tearDown() throws IOException {
cleanupResources();
}
- @Test
- public void testSchemaEvolutionOnUpdate() throws Exception {
+ private WriteStatus prepareFirstRecordCommit(List<String> recordsStrs)
throws IOException {
// Create a bunch of records with a old version of schema
final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleSchema.txt");
final HoodieSparkTable table = HoodieSparkTable.create(config, context);
-
final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x
-> {
- String recordStr1 =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
- + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
- String recordStr2 =
"{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
- + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
- String recordStr3 =
"{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
- + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
List<HoodieRecord> insertRecords = new ArrayList<>();
- RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
- insertRecords
- .add(new HoodieRecord(new HoodieKey(rowChange1.getRowKey(),
rowChange1.getPartitionPath()), rowChange1));
- RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
- insertRecords
- .add(new HoodieRecord(new HoodieKey(rowChange2.getRowKey(),
rowChange2.getPartitionPath()), rowChange2));
- RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3);
- insertRecords
- .add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(),
rowChange3.getPartitionPath()), rowChange3));
-
+ for (String recordStr : recordsStrs) {
+ RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+ insertRecords
+ .add(new HoodieRecord(new HoodieKey(rowChange.getRowKey(),
rowChange.getPartitionPath()), rowChange));
+ }
Map<String, HoodieRecord> insertRecordMap = insertRecords.stream()
.collect(Collectors.toMap(r -> r.getRecordKey(),
Function.identity()));
HoodieCreateHandle createHandle =
- new HoodieCreateHandle(config, "100", table,
rowChange1.getPartitionPath(), "f1-0", insertRecordMap, supplier);
+ new HoodieCreateHandle(config, "100", table,
insertRecords.get(0).getPartitionPath(), "f1-0", insertRecordMap, supplier);
createHandle.write();
return createHandle.close();
}).collect();
final Path commitFile = new Path(config.getBasePath() + "/.hoodie/" +
HoodieTimeline.makeCommitFileName("100"));
FSUtils.getFs(basePath,
HoodieTestUtils.getDefaultHadoopConf()).create(commitFile);
+ return statuses.get(0);
+ }
- // Now try an update with an evolved schema
- // Evolved schema does not have guarantee on preserving the original field
ordering
- final HoodieWriteConfig config2 =
makeHoodieClientConfig("/exampleEvolvedSchema.txt");
- final WriteStatus insertResult = statuses.get(0);
+ private List<String> generateMultiRecordsForExampleSchema() {
+ List<String> recordsStrs = new ArrayList<>();
+ String recordStr1 =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
+ String recordStr2 =
"{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
+ String recordStr3 =
"{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
+ recordsStrs.add(recordStr1);
+ recordsStrs.add(recordStr2);
+ recordsStrs.add(recordStr3);
+ return recordsStrs;
+ }
+
+ private List<String> generateOneRecordForExampleSchema() {
+ List<String> recordsStrs = new ArrayList<>();
+ String recordStr =
"{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
+ recordsStrs.add(recordStr);
+ return recordsStrs;
+ }
+
+ @Test
+ public void testSchemaEvolutionOnUpdateSuccessWithAddColumnHaveDefault()
throws Exception {
+ final WriteStatus insertResult =
prepareFirstRecordCommit(generateMultiRecordsForExampleSchema());
String fileId = insertResult.getFileId();
- final HoodieSparkTable table2 = HoodieSparkTable.create(config, context);
- assertEquals(1, jsc.parallelize(Arrays.asList(1)).map(x -> {
+ // Now try an update with an evolved schema
+ // Evolved schema does not have guarantee on preserving the original field
ordering
+ final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleEvolvedSchema.txt");
+ final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
// New content with values for the newly added field
- String recordStr1 =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ String recordStr =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+
"\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}";
List<HoodieRecord> updateRecords = new ArrayList<>();
- RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
- HoodieRecord record1 =
- new HoodieRecord(new HoodieKey(rowChange1.getRowKey(),
rowChange1.getPartitionPath()), rowChange1);
- record1.unseal();
- record1.setCurrentLocation(new HoodieRecordLocation("100", fileId));
- record1.seal();
- updateRecords.add(record1);
-
+ RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+ HoodieRecord record =
+ new HoodieRecord(new HoodieKey(rowChange.getRowKey(),
rowChange.getPartitionPath()), rowChange);
+ record.unseal();
+ record.setCurrentLocation(new HoodieRecordLocation("101", fileId));
+ record.seal();
+ updateRecords.add(record);
assertDoesNotThrow(() -> {
- HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config2, "101",
table2,
- updateRecords.iterator(), record1.getPartitionPath(), fileId,
supplier);
+ HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, "101",
table,
+ updateRecords.iterator(), record.getPartitionPath(), fileId,
supplier);
Configuration conf = new Configuration();
AvroReadSupport.setAvroReadSchema(conf,
mergeHandle.getWriterSchemaWithMetafields());
List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
- new Path(config2.getBasePath() + "/" +
insertResult.getStat().getPath()));
+ new Path(config.getBasePath() + "/" +
insertResult.getStat().getPath()));
for (GenericRecord rec : oldRecords) {
mergeHandle.write(rec);
}
mergeHandle.close();
}, "UpdateFunction could not read records written with exampleSchema.txt
using the "
+ "exampleEvolvedSchema.txt");
+ return 1;
+ }).collect();
+ }
+
+ @Test
+ public void testSchemaEvolutionOnUpdateSuccessWithChangeColumnOrder() throws
Exception {
+ final WriteStatus insertResult =
prepareFirstRecordCommit(generateMultiRecordsForExampleSchema());
+ String fileId = insertResult.getFileId();
+ // Now try an update with an evolved schema
+ // Evolved schema does not have guarantee on preserving the original field
ordering
+ final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleEvolvedSchemaChangeOrder.txt");
+ final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
+ // New content with values for the newly added field
+ String recordStr =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ +
"\"time\":\"2016-01-31T03:16:41.415Z\",\"added_field\":1},\"number\":12";
+ List<HoodieRecord> updateRecords = new ArrayList<>();
+ RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+ HoodieRecord record =
+ new HoodieRecord(new HoodieKey(rowChange.getRowKey(),
rowChange.getPartitionPath()), rowChange);
+ record.unseal();
+ record.setCurrentLocation(new HoodieRecordLocation("101", fileId));
+ record.seal();
+ updateRecords.add(record);
+ assertDoesNotThrow(() -> {
+ HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, "101",
table,
+ updateRecords.iterator(), record.getPartitionPath(), fileId,
supplier);
+ Configuration conf = new Configuration();
+ AvroReadSupport.setAvroReadSchema(conf,
mergeHandle.getWriterSchemaWithMetafields());
+ List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
+ new Path(config.getBasePath() + "/" +
insertResult.getStat().getPath()));
+ for (GenericRecord rec : oldRecords) {
+ mergeHandle.write(rec);
+ }
+ mergeHandle.close();
+ }, "UpdateFunction could not read records written with exampleSchema.txt
using the "
+ + "exampleEvolvedSchemaChangeOrder.txt as column order change");
return 1;
- }).collect().size());
+ }).collect();
+ }
+
+ @Test
+ public void testSchemaEvolutionOnUpdateMisMatchWithDeleteColumn() throws
Exception {
+ final WriteStatus insertResult =
prepareFirstRecordCommit(generateOneRecordForExampleSchema());
+ String fileId = insertResult.getFileId();
+
+ // Now try an update with an evolved schema
+ // Evolved schema does not have guarantee on preserving the original field
ordering
+ final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleEvolvedSchemaDeleteColumn.txt");
+ final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
+ // New content with values for the newly added field
+ String recordStr =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:16:41.415Z\"}";
+ List<HoodieRecord> updateRecords = new ArrayList<>();
+ RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+ HoodieRecord record =
+ new HoodieRecord(new HoodieKey(rowChange.getRowKey(),
rowChange.getPartitionPath()), rowChange);
+ record.unseal();
+ record.setCurrentLocation(new HoodieRecordLocation("101", fileId));
+ record.seal();
+ updateRecords.add(record);
+ HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, "101",
table,
+ updateRecords.iterator(), record.getPartitionPath(), fileId,
supplier);
+ Configuration conf = new Configuration();
+ AvroReadSupport.setAvroReadSchema(conf,
mergeHandle.getWriterSchemaWithMetafields());
+ assertThrows(InvalidRecordException.class, () -> {
+ List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
+ new Path(config.getBasePath() + "/" +
insertResult.getStat().getPath()));
+ }, "UpdateFunction when delete column ,Parquet/Avro schema mismatch:
Avro field 'xxx' not found");
+ mergeHandle.close();
+ return 1;
+ }).collect();
+ }
+
+ @Test
+ public void testSchemaEvolutionOnUpdateMisMatchWithAddColumnNotHaveDefault()
throws Exception {
+ final WriteStatus insertResult =
prepareFirstRecordCommit(generateOneRecordForExampleSchema());
+ String fileId = insertResult.getFileId();
+
+ // Now try an update with an evolved schema
+ // Evolved schema does not have guarantee on preserving the original field
ordering
+ final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleEvolvedSchemaColumnRequire.txt");
+ final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
+ // New content with values for the newly added field
+ String recordStr =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ +
"\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}";
+ List<HoodieRecord> updateRecords = new ArrayList<>();
+ RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+ HoodieRecord record =
+ new HoodieRecord(new HoodieKey(rowChange.getRowKey(),
rowChange.getPartitionPath()), rowChange);
+ record.unseal();
+ record.setCurrentLocation(new HoodieRecordLocation("101", fileId));
+ record.seal();
+ updateRecords.add(record);
+ assertThrows(HoodieUpsertException.class, () -> {
+ HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, "101",
table,
+ updateRecords.iterator(), record.getPartitionPath(), fileId,
supplier);
+ Configuration conf = new Configuration();
+ AvroReadSupport.setAvroReadSchema(conf,
mergeHandle.getWriterSchemaWithMetafields());
+ List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
+ new Path(config.getBasePath() + "/" +
insertResult.getStat().getPath()));
+ for (GenericRecord rec : oldRecords) {
+ mergeHandle.write(rec);
+ }
+ mergeHandle.close();
+ }, "UpdateFunction could not read records written with exampleSchema.txt
using the "
+ + "exampleEvolvedSchemaColumnRequire.txt ,because oldrecords do not
have required column added_field");
+ return 1;
+ }).collect();
+ }
+
+ @Test
+ public void testSchemaEvolutionOnUpdateMisMatchWithChangeColumnType() throws
Exception {
+ final WriteStatus insertResult =
prepareFirstRecordCommit(generateOneRecordForExampleSchema());
+ String fileId = insertResult.getFileId();
+
+ // Now try an update with an evolved schema
+ // Evolved schema does not have guarantee on preserving the original field
ordering
+ final HoodieWriteConfig config =
makeHoodieClientConfig("/exampleEvolvedSchemaColumnType.txt");
+ final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
+ // New content with values for the newly added field
+ String recordStr =
"{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":\"12\"}";
+ List<HoodieRecord> updateRecords = new ArrayList<>();
+ RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
+ HoodieRecord record =
+ new HoodieRecord(new HoodieKey(rowChange.getRowKey(),
rowChange.getPartitionPath()), rowChange);
+ record.unseal();
+ record.setCurrentLocation(new HoodieRecordLocation("101", fileId));
+ record.seal();
+ updateRecords.add(record);
+ HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, "101",
table,
+ updateRecords.iterator(), record.getPartitionPath(), fileId,
supplier);
+ Configuration conf = new Configuration();
+ AvroReadSupport.setAvroReadSchema(conf,
mergeHandle.getWriterSchemaWithMetafields());
Review comment:
instead of creating a new Configuration, would it be better with
`table.getHadoopConf()`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]