This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 281ef1a4a99 [HUDI-6754] Fix record reader tests in hudi-hadoop-mr 
(#9535)
281ef1a4a99 is described below

commit 281ef1a4a99e462b6b4f032b23f18f20a20510e5
Author: Sagar Sumit <[email protected]>
AuthorDate: Sat Aug 26 01:53:54 2023 +0530

    [HUDI-6754] Fix record reader tests in hudi-hadoop-mr (#9535)
---
 .../realtime/AbstractRealtimeRecordReader.java     |  1 -
 .../hive/TestHoodieCombineHiveInputFormat.java     | 23 +++++---
 .../TestHoodieMergeOnReadSnapshotReader.java       |  6 +++
 .../realtime/TestHoodieRealtimeRecordReader.java   | 44 +++++++++------
 .../hudi/hadoop/testutils/InputFormatTestUtil.java | 63 +++++++++++-----------
 5 files changed, 81 insertions(+), 56 deletions(-)

diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index 04a05a1d6f0..3cd2a5d05d9 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -133,7 +133,6 @@ public abstract class AbstractRealtimeRecordReader {
       LOG.warn("fall to init HiveAvroSerializer to support payload merge", e);
       this.supportPayload = false;
     }
-
   }
 
   /**
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
index e8c286d8ab7..22e5389a930 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -84,8 +85,11 @@ public class TestHoodieCombineHiveInputFormat extends 
HoodieCommonTestHarness {
   }
 
   @AfterAll
-  public static void tearDownClass() {
+  public static void tearDownClass() throws IOException {
     hdfsTestService.stop();
+    if (fs != null) {
+      fs.close();
+    }
   }
 
   @BeforeEach
@@ -93,6 +97,13 @@ public class TestHoodieCombineHiveInputFormat extends 
HoodieCommonTestHarness {
     assertTrue(fs.mkdirs(new Path(tempDir.toAbsolutePath().toString())));
   }
 
+  @AfterEach
+  public void tearDown() throws IOException {
+    if (fs != null) {
+      fs.delete(new Path(tempDir.toAbsolutePath().toString()), true);
+    }
+  }
+
   @Test
   public void multiPartitionReadersRealtimeCombineHoodieInputFormat() throws 
Exception {
     // test for HUDI-1718
@@ -154,8 +165,8 @@ public class TestHoodieCombineHiveInputFormat extends 
HoodieCommonTestHarness {
     ArrayWritable arrayWritable = recordReader.createValue();
     int counter = 0;
 
-    HoodieCombineRealtimeHiveSplit hiveSplit = 
(HoodieCombineRealtimeHiveSplit)splits[0];
-    HoodieCombineRealtimeFileSplit fileSplit = 
(HoodieCombineRealtimeFileSplit)hiveSplit.getInputSplitShim();
+    HoodieCombineRealtimeHiveSplit hiveSplit = 
(HoodieCombineRealtimeHiveSplit) splits[0];
+    HoodieCombineRealtimeFileSplit fileSplit = 
(HoodieCombineRealtimeFileSplit) hiveSplit.getInputSplitShim();
     List<FileSplit> realtimeFileSplits = fileSplit.getRealtimeFileSplits();
 
     while (recordReader.next(nullWritable, arrayWritable)) {
@@ -268,8 +279,8 @@ public class TestHoodieCombineHiveInputFormat extends 
HoodieCommonTestHarness {
     // insert 1000 update records to log file 2
     // now fileid0, fileid1 has no log files, fileid2 has log file
     HoodieLogFormat.Writer writer =
-            InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, 
schema, "fileid2", commitTime, newCommitTime,
-                    numRecords, numRecords, 0);
+        InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, 
"fileid2", commitTime, newCommitTime,
+            numRecords, numRecords, 0);
     writer.close();
 
     TableDesc tblDesc = Utilities.defaultTd;
@@ -304,7 +315,7 @@ public class TestHoodieCombineHiveInputFormat extends 
HoodieCommonTestHarness {
     // Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 
file groups
     assertEquals(1, splits.length);
     RecordReader<NullWritable, ArrayWritable> recordReader =
-            combineHiveInputFormat.getRecordReader(splits[0], jobConf, null);
+        combineHiveInputFormat.getRecordReader(splits[0], jobConf, null);
     NullWritable nullWritable = recordReader.createKey();
     ArrayWritable arrayWritable = recordReader.createValue();
     int counter = 0;
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
index b37b4170a0c..adee06cc20d 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
@@ -43,6 +43,7 @@ import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.jupiter.api.AfterEach;
@@ -67,6 +68,9 @@ public class TestHoodieMergeOnReadSnapshotReader {
 
   private static final int TOTAL_RECORDS = 100;
   private static final String FILE_ID = "fileid0";
+  private static final String COLUMNS =
+      
"_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,field1,field2,name,favorite_number,favorite_color,favorite_movie";
+  private static final String COLUMN_TYPES = 
"string,string,string,string,string,string,string,string,int,string,string";
   private JobConf baseJobConf;
   private FileSystem fs;
   private Configuration hadoopConf;
@@ -81,6 +85,8 @@ public class TestHoodieMergeOnReadSnapshotReader {
     hadoopConf.set("fs.file.impl", 
org.apache.hadoop.fs.LocalFileSystem.class.getName());
     baseJobConf = new JobConf(hadoopConf);
     baseJobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, 
String.valueOf(1024 * 1024));
+    baseJobConf.set(serdeConstants.LIST_COLUMNS, COLUMNS);
+    baseJobConf.set(serdeConstants.LIST_COLUMN_TYPES, COLUMN_TYPES);
     fs = getFs(basePath.toUri().toString(), baseJobConf);
   }
 
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index 9fca206ac26..201b18aaa6d 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -71,8 +71,8 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -101,7 +101,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
 
-@Disabled("HUDI-6755")
 public class TestHoodieRealtimeRecordReader {
 
   private static final String PARTITION_COLUMN = "datestr";
@@ -119,11 +118,22 @@ public class TestHoodieRealtimeRecordReader {
     fs = FSUtils.getFs(basePath.toUri().toString(), baseJobConf);
   }
 
+  @AfterEach
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.delete(new Path(basePath.toString()), true);
+      fs.close();
+    }
+    if (baseJobConf != null) {
+      baseJobConf.clear();
+    }
+  }
+
   @TempDir
   public java.nio.file.Path basePath;
 
   private Writer writeLogFile(File partitionDir, Schema schema, String fileId, 
String baseCommit, String newCommit,
-      int numberOfRecords) throws InterruptedException, IOException {
+                              int numberOfRecords) throws 
InterruptedException, IOException {
     return InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, 
schema, fileId, baseCommit, newCommit,
         numberOfRecords, 0,
         0);
@@ -171,8 +181,8 @@ public class TestHoodieRealtimeRecordReader {
   }
 
   private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType,
-                         boolean isCompressionEnabled,
-                         boolean partitioned, 
HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception {
+                                  boolean isCompressionEnabled,
+                                  boolean partitioned, 
HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception {
     // initial commit
     Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
     HoodieTestUtils.init(hadoopConf, basePath.toString(), 
HoodieTableType.MERGE_ON_READ);
@@ -612,7 +622,7 @@ public class TestHoodieRealtimeRecordReader {
     String newCommitTime = "101";
     File partitionDir1 =
         InputFormatTestUtil.prepareSimpleParquetTable(basePath, evolvedSchema, 
1, numberOfRecords,
-            instantTime, HoodieTableType.MERGE_ON_READ,"2017","05","01");
+            instantTime, HoodieTableType.MERGE_ON_READ, "2017", "05", "01");
     HoodieCommitMetadata commitMetadata1 = 
CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), 
Option.empty(), WriteOperationType.UPSERT,
         evolvedSchema.toString(), HoodieTimeline.COMMIT_ACTION);
     FileCreateUtils.createCommit(basePath.toString(), newCommitTime, 
Option.of(commitMetadata1));
@@ -665,7 +675,7 @@ public class TestHoodieRealtimeRecordReader {
     final int numRecords = 1000;
     File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, 
schema, 1, numRecords, instantTime,
         HoodieTableType.MERGE_ON_READ);
-    createDeltaCommitFile(basePath, instantTime,"2016/05/01", 
"2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString());
+    createDeltaCommitFile(basePath, instantTime, "2016/05/01", 
"2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString());
     // Add the paths
     FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
 
@@ -676,11 +686,11 @@ public class TestHoodieRealtimeRecordReader {
           InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, 
schema, "fileid0", instantTime, newCommitTime,
               numRecords, numRecords, 0);
       writer.close();
-      createDeltaCommitFile(basePath, newCommitTime,"2016/05/01", 
"2016/05/01/.fileid0_100.log.1_1-0-1", "fileid0", schema.toString());
+      createDeltaCommitFile(basePath, newCommitTime, "2016/05/01", 
"2016/05/01/.fileid0_100.log.1_1-0-1", "fileid0", schema.toString());
 
       InputFormatTestUtil.setupIncremental(baseJobConf, "101", 1);
 
-      HoodieParquetRealtimeInputFormat inputFormat =  new 
HoodieParquetRealtimeInputFormat();
+      HoodieParquetRealtimeInputFormat inputFormat = new 
HoodieParquetRealtimeInputFormat();
       inputFormat.setConf(baseJobConf);
       InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1);
       assertEquals(1, splits.length);
@@ -688,7 +698,7 @@ public class TestHoodieRealtimeRecordReader {
       List<Schema.Field> fields = schema.getFields();
       setHiveColumnNameProps(fields, newJobConf, false);
       newJobConf.set("columns.types", 
"string,string,string,string,string,string,string,string,bigint,string,string");
-      RecordReader<NullWritable, ArrayWritable> reader  = 
inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL);
+      RecordReader<NullWritable, ArrayWritable> reader = 
inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL);
       // use reader to read log file.
       NullWritable key = reader.createKey();
       ArrayWritable value = reader.createValue();
@@ -714,21 +724,21 @@ public class TestHoodieRealtimeRecordReader {
     String baseInstant = "100";
     File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, 
schema, 1, 100, baseInstant,
         HoodieTableType.MERGE_ON_READ);
-    createDeltaCommitFile(basePath, baseInstant,"2016/05/01", 
"2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString());
+    createDeltaCommitFile(basePath, baseInstant, "2016/05/01", 
"2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString());
     // Add the paths
     FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
 
     InputFormatTestUtil.simulateInserts(partitionDir, ".parquet", "fileid1", 
1, "200");
     Map<String, List<String>> partitionToReplaceFileIds = new HashMap<>();
-    List<String> replacedFileId  = new ArrayList<>();
+    List<String> replacedFileId = new ArrayList<>();
     replacedFileId.add("fileid0");
     partitionToReplaceFileIds.put("2016/05/01", replacedFileId);
     createReplaceCommitFile(basePath,
-        "200","2016/05/01", "2016/05/01/fileid10_1-0-1_200.parquet", 
"fileid10", partitionToReplaceFileIds);
+        "200", "2016/05/01", "2016/05/01/fileid10_1-0-1_200.parquet", 
"fileid10", partitionToReplaceFileIds);
 
     InputFormatTestUtil.setupIncremental(baseJobConf, "0", 1);
 
-    HoodieParquetRealtimeInputFormat inputFormat =  new 
HoodieParquetRealtimeInputFormat();
+    HoodieParquetRealtimeInputFormat inputFormat = new 
HoodieParquetRealtimeInputFormat();
     inputFormat.setConf(baseJobConf);
     InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1);
     assertTrue(splits.length == 1);
@@ -736,7 +746,7 @@ public class TestHoodieRealtimeRecordReader {
     List<Schema.Field> fields = schema.getFields();
     setHiveColumnNameProps(fields, newJobConf, false);
     newJobConf.set("columns.types", 
"string,string,string,string,string,string,string,string,bigint,string,string");
-    RecordReader<NullWritable, ArrayWritable> reader  = 
inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL);
+    RecordReader<NullWritable, ArrayWritable> reader = 
inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL);
 
     // use reader to read log file.
     NullWritable key = reader.createKey();
@@ -883,7 +893,7 @@ public class TestHoodieRealtimeRecordReader {
     String baseInstant = "100";
     File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, 
schema, 1, 100, baseInstant,
         HoodieTableType.MERGE_ON_READ);
-    createDeltaCommitFile(basePath, baseInstant,"2016/05/01", 
"2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString());
+    createDeltaCommitFile(basePath, baseInstant, "2016/05/01", 
"2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString());
     // Add the paths
     FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
 
@@ -896,7 +906,7 @@ public class TestHoodieRealtimeRecordReader {
     InputFormatTestUtil.setupIncremental(baseJobConf, "100", 10);
 
     // verify that incremental reads do NOT show inserts after compaction 
timestamp
-    HoodieParquetRealtimeInputFormat inputFormat =  new 
HoodieParquetRealtimeInputFormat();
+    HoodieParquetRealtimeInputFormat inputFormat = new 
HoodieParquetRealtimeInputFormat();
     inputFormat.setConf(baseJobConf);
     InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1);
     assertTrue(splits.length == 0);
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index c79fe436f95..4207e3bf113 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -77,7 +77,7 @@ public class InputFormatTestUtil {
   }
 
   public static File prepareCustomizedTable(java.nio.file.Path basePath, 
HoodieFileFormat baseFileFormat, int numberOfFiles,
-                                  String commitNumber, boolean 
useNonPartitionedKeyGen, boolean populateMetaFields, boolean injectData, Schema 
schema)
+                                            String commitNumber, boolean 
useNonPartitionedKeyGen, boolean populateMetaFields, boolean injectData, Schema 
schema)
       throws IOException {
     if (useNonPartitionedKeyGen) {
       HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), 
basePath.toString(), HoodieTableType.COPY_ON_WRITE,
@@ -107,7 +107,7 @@ public class InputFormatTestUtil {
   }
 
   public static File prepareMultiPartitionTable(java.nio.file.Path basePath, 
HoodieFileFormat baseFileFormat, int numberOfFiles,
-                                  String commitNumber, String 
finalLevelPartitionName)
+                                                String commitNumber, String 
finalLevelPartitionName)
       throws IOException {
     HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), 
basePath.toString(), HoodieTableType.COPY_ON_WRITE,
         baseFileFormat);
@@ -178,15 +178,15 @@ public class InputFormatTestUtil {
 
   public static void setupIncremental(JobConf jobConf, String startCommit, int 
numberOfCommitsToPull, String databaseName, boolean isIncrementalUseDatabase) {
     String modePropertyName =
-            String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, 
databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+        String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, 
databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
     jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
 
     String startCommitTimestampName =
-            String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, 
databaseName + "."  + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+        String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, 
databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
     jobConf.set(startCommitTimestampName, startCommit);
 
     String maxCommitPulls =
-            String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, 
databaseName + "."  + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+        String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, databaseName 
+ "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
     jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
 
     jobConf.setBoolean(HoodieHiveUtils.HOODIE_INCREMENTAL_USE_DATABASE, 
isIncrementalUseDatabase);
@@ -202,7 +202,7 @@ public class InputFormatTestUtil {
   public static void setupSnapshotMaxCommitTimeQueryMode(JobConf jobConf, 
String maxInstantTime) {
     setUpScanMode(jobConf);
     String validateTimestampName =
-            String.format(HoodieHiveUtils.HOODIE_CONSUME_COMMIT, 
HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+        String.format(HoodieHiveUtils.HOODIE_CONSUME_COMMIT, 
HoodieTestUtils.RAW_TRIPS_TEST_NAME);
     jobConf.set(validateTimestampName, maxInstantTime);
   }
 
@@ -224,7 +224,7 @@ public class InputFormatTestUtil {
   }
 
   public static File prepareParquetTable(java.nio.file.Path basePath, Schema 
schema, int numberOfFiles,
-      int numberOfRecords, String commitNumber) throws IOException {
+                                         int numberOfRecords, String 
commitNumber) throws IOException {
     return prepareParquetTable(basePath, schema, numberOfFiles, 
numberOfRecords, commitNumber, HoodieTableType.COPY_ON_WRITE);
   }
 
@@ -241,13 +241,13 @@ public class InputFormatTestUtil {
   }
 
   public static File prepareSimpleParquetTable(java.nio.file.Path basePath, 
Schema schema, int numberOfFiles,
-      int numberOfRecords, String commitNumber) throws Exception {
+                                               int numberOfRecords, String 
commitNumber) throws Exception {
     return prepareSimpleParquetTable(basePath, schema, numberOfFiles, 
numberOfRecords, commitNumber, HoodieTableType.COPY_ON_WRITE);
   }
 
   public static File prepareSimpleParquetTable(java.nio.file.Path basePath, 
Schema schema, int numberOfFiles,
                                                int numberOfRecords, String 
commitNumber, HoodieTableType tableType) throws Exception {
-    return prepareSimpleParquetTable(basePath, schema, numberOfFiles, 
numberOfRecords, commitNumber, tableType, "2016","05","01");
+    return prepareSimpleParquetTable(basePath, schema, numberOfFiles, 
numberOfRecords, commitNumber, tableType, "2016", "05", "01");
   }
 
   public static File prepareSimpleParquetTable(java.nio.file.Path basePath, 
Schema schema, int numberOfFiles,
@@ -263,7 +263,7 @@ public class InputFormatTestUtil {
   }
 
   public static File prepareNonPartitionedParquetTable(java.nio.file.Path 
basePath, Schema schema, int numberOfFiles,
-      int numberOfRecords, String commitNumber) throws IOException {
+                                                       int numberOfRecords, 
String commitNumber) throws IOException {
     return prepareNonPartitionedParquetTable(basePath, schema, numberOfFiles, 
numberOfRecords, commitNumber, HoodieTableType.COPY_ON_WRITE);
   }
 
@@ -275,7 +275,7 @@ public class InputFormatTestUtil {
   }
 
   public static List<File> 
prepareMultiPartitionedParquetTable(java.nio.file.Path basePath, Schema schema,
-      int numberPartitions, int numberOfRecordsPerPartition, String 
commitNumber, HoodieTableType tableType) throws IOException {
+                                                               int 
numberPartitions, int numberOfRecordsPerPartition, String commitNumber, 
HoodieTableType tableType) throws IOException {
     List<File> result = new ArrayList<>();
     HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), 
basePath.toString(), tableType, HoodieFileFormat.PARQUET);
     for (int i = 0; i < numberPartitions; i++) {
@@ -290,7 +290,7 @@ public class InputFormatTestUtil {
   }
 
   private static void createData(Schema schema, java.nio.file.Path 
partitionPath, int numberOfFiles, int numberOfRecords,
-      String commitNumber) throws IOException {
+                                 String commitNumber) throws IOException {
     AvroParquetWriter parquetWriter;
     for (int i = 0; i < numberOfFiles; i++) {
       String fileId = FSUtils.makeBaseFileName(commitNumber, TEST_WRITE_TOKEN, 
"fileid" + i, HoodieFileFormat.PARQUET.getFileExtension());
@@ -305,8 +305,7 @@ public class InputFormatTestUtil {
     }
   }
 
-  private static void createSimpleData(Schema schema, java.nio.file.Path 
partitionPath, int numberOfFiles, int numberOfRecords,
-      String commitNumber) throws Exception {
+  private static void createSimpleData(Schema schema, java.nio.file.Path 
partitionPath, int numberOfFiles, int numberOfRecords, String commitNumber) 
throws Exception {
     AvroParquetWriter parquetWriter;
     for (int i = 0; i < numberOfFiles; i++) {
       String fileId = FSUtils.makeBaseFileName(commitNumber, "1", "fileid" + 
i, HoodieFileFormat.PARQUET.getFileExtension());
@@ -328,7 +327,7 @@ public class InputFormatTestUtil {
   }
 
   private static Iterable<? extends GenericRecord> generateAvroRecords(Schema 
schema, int numberOfRecords,
-      String instantTime, String fileId) throws IOException {
+                                                                       String 
instantTime, String fileId) throws IOException {
     List<GenericRecord> records = new ArrayList<>(numberOfRecords);
     for (int i = 0; i < numberOfRecords; i++) {
       records.add(SchemaTestUtil.generateAvroRecordFromJson(schema, i, 
instantTime, fileId));
@@ -337,7 +336,7 @@ public class InputFormatTestUtil {
   }
 
   public static void simulateParquetUpdates(File directory, Schema schema, 
String originalCommit,
-      int totalNumberOfRecords, int numberOfRecordsToUpdate, String newCommit) 
throws IOException {
+                                            int totalNumberOfRecords, int 
numberOfRecordsToUpdate, String newCommit) throws IOException {
     File fileToUpdate = Objects.requireNonNull(directory.listFiles((dir, name) 
-> name.endsWith("parquet")))[0];
     String fileId = FSUtils.getFileId(fileToUpdate.getName());
     File dataFile = new File(directory,
@@ -410,8 +409,7 @@ public class InputFormatTestUtil {
   }
 
   public static HoodieLogFormat.Writer writeRollbackBlockToLogFile(File 
partitionDir, FileSystem fs, Schema schema,
-      String
-          fileId, String baseCommit, String newCommit, String oldCommit, int 
logVersion)
+                                                                   String 
fileId, String baseCommit, String newCommit, String oldCommit, int logVersion)
       throws InterruptedException, IOException {
     HoodieLogFormat.Writer writer = 
HoodieLogFormat.newWriterBuilder().onParentPath(new 
Path(partitionDir.getPath()))
         
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).overBaseCommit(baseCommit)
@@ -429,7 +427,7 @@ public class InputFormatTestUtil {
   }
 
   public static void setProjectFieldsForInputFormat(JobConf jobConf,
-      Schema schema, String hiveColumnTypes) {
+                                                    Schema schema, String 
hiveColumnTypes) {
     List<Schema.Field> fields = schema.getFields();
     String names = fields.stream().map(f -> 
f.name().toString()).collect(Collectors.joining(","));
     String positions = fields.stream().map(f -> 
String.valueOf(f.pos())).collect(Collectors.joining(","));
@@ -456,7 +454,7 @@ public class InputFormatTestUtil {
   }
 
   public static void setPropsForInputFormat(JobConf jobConf,
-      Schema schema, String hiveColumnTypes) {
+                                            Schema schema, String 
hiveColumnTypes) {
     List<Schema.Field> fields = schema.getFields();
     String names = fields.stream().map(f -> 
f.name().toString()).collect(Collectors.joining(","));
     String positions = fields.stream().map(f -> 
String.valueOf(f.pos())).collect(Collectors.joining(","));
@@ -484,18 +482,19 @@ public class InputFormatTestUtil {
     Files.createDirectories(partitionPath);
 
     // Create partition metadata to properly setup table's partition
-    RawLocalFileSystem lfs = new RawLocalFileSystem();
-    lfs.setConf(HoodieTestUtils.getDefaultHadoopConf());
-
-    HoodiePartitionMetadata partitionMetadata =
-        new HoodiePartitionMetadata(
-            new LocalFileSystem(lfs),
-            "0",
-            new Path(basePath.toAbsolutePath().toString()),
-            new Path(partitionPath.toAbsolutePath().toString()),
-            Option.of(HoodieFileFormat.PARQUET));
-
-    partitionMetadata.trySave((int) (Math.random() * 1000));
+    try (RawLocalFileSystem lfs = new RawLocalFileSystem()) {
+      lfs.setConf(HoodieTestUtils.getDefaultHadoopConf());
+
+      HoodiePartitionMetadata partitionMetadata =
+          new HoodiePartitionMetadata(
+              new LocalFileSystem(lfs),
+              "0",
+              new Path(basePath.toAbsolutePath().toString()),
+              new Path(partitionPath.toAbsolutePath().toString()),
+              Option.of(HoodieFileFormat.PARQUET));
+
+      partitionMetadata.trySave((int) (Math.random() * 1000));
+    }
   }
 
   public static void setInputPath(JobConf jobConf, String inputPath) {

Reply via email to