prashantwason commented on code in PR #8526:
URL: https://github.com/apache/hudi/pull/8526#discussion_r1203417127
##########
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java:
##########
@@ -2571,6 +2579,174 @@ public void
testDataBlockFormatAppendAndReadWithProjectedSchema(
}
}
+ @Test
+ public void testCorruptBlocks() throws IOException, URISyntaxException,
InterruptedException {
+ List<byte[]> corruptBlocks = createCorruptDataBlocksForTest();
+ int maxTotalBlocks = 20;
+ int runs = 10;
+ Writer writer =
+
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
+ HoodieLogFile logFile = writer.getLogFile();
+ Path logPath = logFile.getPath();
+ fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
+
+ for (; runs >= 0; runs--) {
+ int totalBlocksForThisRun = rand.nextInt(maxTotalBlocks) + 1;
+ fs.create(logPath).close();
+ boolean[] verify = new boolean[totalBlocksForThisRun];
+ List<List<IndexedRecord>> goodBlocks = new ArrayList<>();
+ for (int i = 0; i < totalBlocksForThisRun; i++) {
+ boolean pickCorruptBlock = rand.nextBoolean();
+ if (pickCorruptBlock) {
+ byte[] corruptBlock =
corruptBlocks.get(rand.nextInt(corruptBlocks.size()));
+ FSDataOutputStream outputStream = fs.append(logPath);
+ outputStream.write(corruptBlock);
+ outputStream.close();
+ } else {
+ addGoodBlockToLogFile(goodBlocks);
+ }
+ verify[i] = pickCorruptBlock;
+ }
+
+ Reader reader = HoodieLogFormat.newReader(fs, logFile,
SchemaTestUtil.getSimpleSchema());
+ int curr = 0;
+ int dataBlocksGenerated = 0;
+ int dataBlocksSeen = 0;
+ ByteArrayOutputStream bOStream = new ByteArrayOutputStream();
+ IOUtils.copyBytes(fs.open(logPath), bOStream, 4096);
+ try {
+ while (reader.hasNext()) {
+ HoodieLogBlockType generatedType = verify[curr] ?
HoodieLogBlockType.CORRUPT_BLOCK : HoodieLogBlockType.AVRO_DATA_BLOCK;
+ HoodieLogBlock block = reader.next();
+
+ if (block.getBlockType().equals(HoodieLogBlockType.AVRO_DATA_BLOCK))
{
+ HoodieAvroDataBlock blk = (HoodieAvroDataBlock) block;
+ assertEquals(getRecords(blk), goodBlocks.get(dataBlocksGenerated));
+ }
+
+ dataBlocksGenerated +=
(generatedType.equals(HoodieLogBlockType.AVRO_DATA_BLOCK)) ? 1 : 0;
+ dataBlocksSeen +=
(block.getBlockType().equals(HoodieLogBlockType.AVRO_DATA_BLOCK)) ? 1 : 0;
+
+ assertEquals(generatedType, block.getBlockType(),
Arrays.toString(bOStream.toByteArray()));
+
+ curr++;
+ }
+ reader.close();
+ } catch (Exception e) {
+ System.out.println(Arrays.toString(bOStream.toByteArray()));
Review Comment:
removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]