This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 0d8c34f24da769cd9b0be5f764f897654f9b2b9c Author: Sagar Sumit <[email protected]> AuthorDate: Sat Aug 26 01:53:54 2023 +0530 [HUDI-6754] Fix record reader tests in hudi-hadoop-mr (#9535) --- .../realtime/AbstractRealtimeRecordReader.java | 1 - .../hive/TestHoodieCombineHiveInputFormat.java | 23 +++++--- .../TestHoodieMergeOnReadSnapshotReader.java | 6 +++ .../realtime/TestHoodieRealtimeRecordReader.java | 44 +++++++++------ .../hudi/hadoop/testutils/InputFormatTestUtil.java | 63 +++++++++++----------- 5 files changed, 81 insertions(+), 56 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index 04a05a1d6f0..3cd2a5d05d9 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -133,7 +133,6 @@ public abstract class AbstractRealtimeRecordReader { LOG.warn("fall to init HiveAvroSerializer to support payload merge", e); this.supportPayload = false; } - } /** diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java index e8c286d8ab7..22e5389a930 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java @@ -53,6 +53,7 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -84,8 +85,11 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { } @AfterAll - public static void tearDownClass() { + public static void tearDownClass() throws IOException { hdfsTestService.stop(); + if (fs != null) { + fs.close(); + } } @BeforeEach @@ -93,6 +97,13 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { assertTrue(fs.mkdirs(new Path(tempDir.toAbsolutePath().toString()))); } + @AfterEach + public void tearDown() throws IOException { + if (fs != null) { + fs.delete(new Path(tempDir.toAbsolutePath().toString()), true); + } + } + @Test public void multiPartitionReadersRealtimeCombineHoodieInputFormat() throws Exception { // test for HUDI-1718 @@ -154,8 +165,8 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { ArrayWritable arrayWritable = recordReader.createValue(); int counter = 0; - HoodieCombineRealtimeHiveSplit hiveSplit = (HoodieCombineRealtimeHiveSplit)splits[0]; - HoodieCombineRealtimeFileSplit fileSplit = (HoodieCombineRealtimeFileSplit)hiveSplit.getInputSplitShim(); + HoodieCombineRealtimeHiveSplit hiveSplit = (HoodieCombineRealtimeHiveSplit) splits[0]; + HoodieCombineRealtimeFileSplit fileSplit = (HoodieCombineRealtimeFileSplit) hiveSplit.getInputSplitShim(); List<FileSplit> realtimeFileSplits = fileSplit.getRealtimeFileSplits(); while (recordReader.next(nullWritable, arrayWritable)) { @@ -268,8 +279,8 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { // insert 1000 update records to log file 2 // now fileid0, fileid1 has no log files, fileid2 has log file HoodieLogFormat.Writer writer = - InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid2", commitTime, newCommitTime, - numRecords, numRecords, 0); + InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid2", commitTime, newCommitTime, + numRecords, numRecords, 0); writer.close(); TableDesc tblDesc = Utilities.defaultTd; @@ -304,7 +315,7 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { // Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 file groups assertEquals(1, splits.length); RecordReader<NullWritable, ArrayWritable> recordReader = - combineHiveInputFormat.getRecordReader(splits[0], jobConf, null); + combineHiveInputFormat.getRecordReader(splits[0], jobConf, null); NullWritable nullWritable = recordReader.createKey(); ArrayWritable arrayWritable = recordReader.createValue(); int counter = 0; diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java index b37b4170a0c..adee06cc20d 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java @@ -43,6 +43,7 @@ import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.junit.jupiter.api.AfterEach; @@ -67,6 +68,9 @@ public class TestHoodieMergeOnReadSnapshotReader { private static final int TOTAL_RECORDS = 100; private static final String FILE_ID = "fileid0"; + private static final String COLUMNS = + "_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,field1,field2,name,favorite_number,favorite_color,favorite_movie"; + private static final String COLUMN_TYPES = "string,string,string,string,string,string,string,string,int,string,string"; private JobConf baseJobConf; private FileSystem fs; private Configuration hadoopConf; @@ -81,6 +85,8 @@ public class TestHoodieMergeOnReadSnapshotReader { hadoopConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); baseJobConf = new JobConf(hadoopConf); baseJobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1024 * 1024)); + baseJobConf.set(serdeConstants.LIST_COLUMNS, COLUMNS); + baseJobConf.set(serdeConstants.LIST_COLUMN_TYPES, COLUMN_TYPES); fs = getFs(basePath.toUri().toString(), baseJobConf); } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 9fca206ac26..201b18aaa6d 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -71,8 +71,8 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -101,7 +101,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.params.provider.Arguments.arguments; -@Disabled("HUDI-6755") public class TestHoodieRealtimeRecordReader { private static final String PARTITION_COLUMN = "datestr"; @@ -119,11 +118,22 @@ public class TestHoodieRealtimeRecordReader { fs = FSUtils.getFs(basePath.toUri().toString(), baseJobConf); } + @AfterEach + public void tearDown() throws Exception { + if (fs != null) { + fs.delete(new Path(basePath.toString()), true); + fs.close(); + } + if (baseJobConf != null) { + baseJobConf.clear(); + } + } + @TempDir public java.nio.file.Path basePath; private Writer writeLogFile(File partitionDir, Schema schema, String fileId, String baseCommit, String newCommit, - int numberOfRecords) throws InterruptedException, IOException { + int numberOfRecords) throws InterruptedException, IOException { return InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, fileId, baseCommit, newCommit, numberOfRecords, 0, 0); @@ -171,8 +181,8 @@ public class TestHoodieRealtimeRecordReader { } private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType, - boolean isCompressionEnabled, - boolean partitioned, HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception { + boolean isCompressionEnabled, + boolean partitioned, HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception { // initial commit Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); @@ -612,7 +622,7 @@ public class TestHoodieRealtimeRecordReader { String newCommitTime = "101"; File partitionDir1 = InputFormatTestUtil.prepareSimpleParquetTable(basePath, evolvedSchema, 1, numberOfRecords, - instantTime, HoodieTableType.MERGE_ON_READ,"2017","05","01"); + instantTime, HoodieTableType.MERGE_ON_READ, "2017", "05", "01"); HoodieCommitMetadata commitMetadata1 = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, evolvedSchema.toString(), HoodieTimeline.COMMIT_ACTION); FileCreateUtils.createCommit(basePath.toString(), newCommitTime, Option.of(commitMetadata1)); @@ -665,7 +675,7 @@ public class TestHoodieRealtimeRecordReader { final int numRecords = 1000; File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, numRecords, instantTime, HoodieTableType.MERGE_ON_READ); - createDeltaCommitFile(basePath, instantTime,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString()); + createDeltaCommitFile(basePath, instantTime, "2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString()); // Add the paths FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); @@ -676,11 +686,11 @@ public class TestHoodieRealtimeRecordReader { InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime, numRecords, numRecords, 0); writer.close(); - createDeltaCommitFile(basePath, newCommitTime,"2016/05/01", "2016/05/01/.fileid0_100.log.1_1-0-1", "fileid0", schema.toString()); + createDeltaCommitFile(basePath, newCommitTime, "2016/05/01", "2016/05/01/.fileid0_100.log.1_1-0-1", "fileid0", schema.toString()); InputFormatTestUtil.setupIncremental(baseJobConf, "101", 1); - HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat(); + HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat(); inputFormat.setConf(baseJobConf); InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1); assertEquals(1, splits.length); @@ -688,7 +698,7 @@ public class TestHoodieRealtimeRecordReader { List<Schema.Field> fields = schema.getFields(); setHiveColumnNameProps(fields, newJobConf, false); newJobConf.set("columns.types", "string,string,string,string,string,string,string,string,bigint,string,string"); - RecordReader<NullWritable, ArrayWritable> reader = inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL); + RecordReader<NullWritable, ArrayWritable> reader = inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL); // use reader to read log file. NullWritable key = reader.createKey(); ArrayWritable value = reader.createValue(); @@ -714,21 +724,21 @@ public class TestHoodieRealtimeRecordReader { String baseInstant = "100"; File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, 100, baseInstant, HoodieTableType.MERGE_ON_READ); - createDeltaCommitFile(basePath, baseInstant,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString()); + createDeltaCommitFile(basePath, baseInstant, "2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString()); // Add the paths FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); InputFormatTestUtil.simulateInserts(partitionDir, ".parquet", "fileid1", 1, "200"); Map<String, List<String>> partitionToReplaceFileIds = new HashMap<>(); - List<String> replacedFileId = new ArrayList<>(); + List<String> replacedFileId = new ArrayList<>(); replacedFileId.add("fileid0"); partitionToReplaceFileIds.put("2016/05/01", replacedFileId); createReplaceCommitFile(basePath, - "200","2016/05/01", "2016/05/01/fileid10_1-0-1_200.parquet", "fileid10", partitionToReplaceFileIds); + "200", "2016/05/01", "2016/05/01/fileid10_1-0-1_200.parquet", "fileid10", partitionToReplaceFileIds); InputFormatTestUtil.setupIncremental(baseJobConf, "0", 1); - HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat(); + HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat(); inputFormat.setConf(baseJobConf); InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1); assertTrue(splits.length == 1); @@ -736,7 +746,7 @@ public class TestHoodieRealtimeRecordReader { List<Schema.Field> fields = schema.getFields(); setHiveColumnNameProps(fields, newJobConf, false); newJobConf.set("columns.types", "string,string,string,string,string,string,string,string,bigint,string,string"); - RecordReader<NullWritable, ArrayWritable> reader = inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL); + RecordReader<NullWritable, ArrayWritable> reader = inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL); // use reader to read log file. NullWritable key = reader.createKey(); @@ -883,7 +893,7 @@ public class TestHoodieRealtimeRecordReader { String baseInstant = "100"; File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, 100, baseInstant, HoodieTableType.MERGE_ON_READ); - createDeltaCommitFile(basePath, baseInstant,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString()); + createDeltaCommitFile(basePath, baseInstant, "2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString()); // Add the paths FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); @@ -896,7 +906,7 @@ public class TestHoodieRealtimeRecordReader { InputFormatTestUtil.setupIncremental(baseJobConf, "100", 10); // verify that incremental reads do NOT show inserts after compaction timestamp - HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat(); + HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat(); inputFormat.setConf(baseJobConf); InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1); assertTrue(splits.length == 0); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index c79fe436f95..4207e3bf113 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -77,7 +77,7 @@ public class InputFormatTestUtil { } public static File prepareCustomizedTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles, - String commitNumber, boolean useNonPartitionedKeyGen, boolean populateMetaFields, boolean injectData, Schema schema) + String commitNumber, boolean useNonPartitionedKeyGen, boolean populateMetaFields, boolean injectData, Schema schema) throws IOException { if (useNonPartitionedKeyGen) { HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE, @@ -107,7 +107,7 @@ public class InputFormatTestUtil { } public static File prepareMultiPartitionTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles, - String commitNumber, String finalLevelPartitionName) + String commitNumber, String finalLevelPartitionName) throws IOException { HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE, baseFileFormat); @@ -178,15 +178,15 @@ public class InputFormatTestUtil { public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, String databaseName, boolean isIncrementalUseDatabase) { String modePropertyName = - String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME); + String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME); jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); String startCommitTimestampName = - String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME); + String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME); jobConf.set(startCommitTimestampName, startCommit); String maxCommitPulls = - String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME); + String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME); jobConf.setInt(maxCommitPulls, numberOfCommitsToPull); jobConf.setBoolean(HoodieHiveUtils.HOODIE_INCREMENTAL_USE_DATABASE, isIncrementalUseDatabase); @@ -202,7 +202,7 @@ public class InputFormatTestUtil { public static void setupSnapshotMaxCommitTimeQueryMode(JobConf jobConf, String maxInstantTime) { setUpScanMode(jobConf); String validateTimestampName = - String.format(HoodieHiveUtils.HOODIE_CONSUME_COMMIT, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + String.format(HoodieHiveUtils.HOODIE_CONSUME_COMMIT, HoodieTestUtils.RAW_TRIPS_TEST_NAME); jobConf.set(validateTimestampName, maxInstantTime); } @@ -224,7 +224,7 @@ public class InputFormatTestUtil { } public static File prepareParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles, - int numberOfRecords, String commitNumber) throws IOException { + int numberOfRecords, String commitNumber) throws IOException { return prepareParquetTable(basePath, schema, numberOfFiles, numberOfRecords, commitNumber, HoodieTableType.COPY_ON_WRITE); } @@ -241,13 +241,13 @@ public class InputFormatTestUtil { } public static File prepareSimpleParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles, - int numberOfRecords, String commitNumber) throws Exception { + int numberOfRecords, String commitNumber) throws Exception { return prepareSimpleParquetTable(basePath, schema, numberOfFiles, numberOfRecords, commitNumber, HoodieTableType.COPY_ON_WRITE); } public static File prepareSimpleParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles, int numberOfRecords, String commitNumber, HoodieTableType tableType) throws Exception { - return prepareSimpleParquetTable(basePath, schema, numberOfFiles, numberOfRecords, commitNumber, tableType, "2016","05","01"); + return prepareSimpleParquetTable(basePath, schema, numberOfFiles, numberOfRecords, commitNumber, tableType, "2016", "05", "01"); } public static File prepareSimpleParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles, @@ -263,7 +263,7 @@ public class InputFormatTestUtil { } public static File prepareNonPartitionedParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles, - int numberOfRecords, String commitNumber) throws IOException { + int numberOfRecords, String commitNumber) throws IOException { return prepareNonPartitionedParquetTable(basePath, schema, numberOfFiles, numberOfRecords, commitNumber, HoodieTableType.COPY_ON_WRITE); } @@ -275,7 +275,7 @@ public class InputFormatTestUtil { } public static List<File> prepareMultiPartitionedParquetTable(java.nio.file.Path basePath, Schema schema, - int numberPartitions, int numberOfRecordsPerPartition, String commitNumber, HoodieTableType tableType) throws IOException { + int numberPartitions, int numberOfRecordsPerPartition, String commitNumber, HoodieTableType tableType) throws IOException { List<File> result = new ArrayList<>(); HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), tableType, HoodieFileFormat.PARQUET); for (int i = 0; i < numberPartitions; i++) { @@ -290,7 +290,7 @@ public class InputFormatTestUtil { } private static void createData(Schema schema, java.nio.file.Path partitionPath, int numberOfFiles, int numberOfRecords, - String commitNumber) throws IOException { + String commitNumber) throws IOException { AvroParquetWriter parquetWriter; for (int i = 0; i < numberOfFiles; i++) { String fileId = FSUtils.makeBaseFileName(commitNumber, TEST_WRITE_TOKEN, "fileid" + i, HoodieFileFormat.PARQUET.getFileExtension()); @@ -305,8 +305,7 @@ public class InputFormatTestUtil { } } - private static void createSimpleData(Schema schema, java.nio.file.Path partitionPath, int numberOfFiles, int numberOfRecords, - String commitNumber) throws Exception { + private static void createSimpleData(Schema schema, java.nio.file.Path partitionPath, int numberOfFiles, int numberOfRecords, String commitNumber) throws Exception { AvroParquetWriter parquetWriter; for (int i = 0; i < numberOfFiles; i++) { String fileId = FSUtils.makeBaseFileName(commitNumber, "1", "fileid" + i, HoodieFileFormat.PARQUET.getFileExtension()); @@ -328,7 +327,7 @@ public class InputFormatTestUtil { } private static Iterable<? extends GenericRecord> generateAvroRecords(Schema schema, int numberOfRecords, - String instantTime, String fileId) throws IOException { + String instantTime, String fileId) throws IOException { List<GenericRecord> records = new ArrayList<>(numberOfRecords); for (int i = 0; i < numberOfRecords; i++) { records.add(SchemaTestUtil.generateAvroRecordFromJson(schema, i, instantTime, fileId)); @@ -337,7 +336,7 @@ public class InputFormatTestUtil { } public static void simulateParquetUpdates(File directory, Schema schema, String originalCommit, - int totalNumberOfRecords, int numberOfRecordsToUpdate, String newCommit) throws IOException { + int totalNumberOfRecords, int numberOfRecordsToUpdate, String newCommit) throws IOException { File fileToUpdate = Objects.requireNonNull(directory.listFiles((dir, name) -> name.endsWith("parquet")))[0]; String fileId = FSUtils.getFileId(fileToUpdate.getName()); File dataFile = new File(directory, @@ -410,8 +409,7 @@ public class InputFormatTestUtil { } public static HoodieLogFormat.Writer writeRollbackBlockToLogFile(File partitionDir, FileSystem fs, Schema schema, - String - fileId, String baseCommit, String newCommit, String oldCommit, int logVersion) + String fileId, String baseCommit, String newCommit, String oldCommit, int logVersion) throws InterruptedException, IOException { HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath())) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).overBaseCommit(baseCommit) @@ -429,7 +427,7 @@ public class InputFormatTestUtil { } public static void setProjectFieldsForInputFormat(JobConf jobConf, - Schema schema, String hiveColumnTypes) { + Schema schema, String hiveColumnTypes) { List<Schema.Field> fields = schema.getFields(); String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(",")); String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); @@ -456,7 +454,7 @@ public class InputFormatTestUtil { } public static void setPropsForInputFormat(JobConf jobConf, - Schema schema, String hiveColumnTypes) { + Schema schema, String hiveColumnTypes) { List<Schema.Field> fields = schema.getFields(); String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(",")); String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); @@ -484,18 +482,19 @@ public class InputFormatTestUtil { Files.createDirectories(partitionPath); // Create partition metadata to properly setup table's partition - RawLocalFileSystem lfs = new RawLocalFileSystem(); - lfs.setConf(HoodieTestUtils.getDefaultHadoopConf()); - - HoodiePartitionMetadata partitionMetadata = - new HoodiePartitionMetadata( - new LocalFileSystem(lfs), - "0", - new Path(basePath.toAbsolutePath().toString()), - new Path(partitionPath.toAbsolutePath().toString()), - Option.of(HoodieFileFormat.PARQUET)); - - partitionMetadata.trySave((int) (Math.random() * 1000)); + try (RawLocalFileSystem lfs = new RawLocalFileSystem()) { + lfs.setConf(HoodieTestUtils.getDefaultHadoopConf()); + + HoodiePartitionMetadata partitionMetadata = + new HoodiePartitionMetadata( + new LocalFileSystem(lfs), + "0", + new Path(basePath.toAbsolutePath().toString()), + new Path(partitionPath.toAbsolutePath().toString()), + Option.of(HoodieFileFormat.PARQUET)); + + partitionMetadata.trySave((int) (Math.random() * 1000)); + } } public static void setInputPath(JobConf jobConf, String inputPath) {
