This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 281ef1a4a99 [HUDI-6754] Fix record reader tests in hudi-hadoop-mr
(#9535)
281ef1a4a99 is described below
commit 281ef1a4a99e462b6b4f032b23f18f20a20510e5
Author: Sagar Sumit <[email protected]>
AuthorDate: Sat Aug 26 01:53:54 2023 +0530
[HUDI-6754] Fix record reader tests in hudi-hadoop-mr (#9535)
---
.../realtime/AbstractRealtimeRecordReader.java | 1 -
.../hive/TestHoodieCombineHiveInputFormat.java | 23 +++++---
.../TestHoodieMergeOnReadSnapshotReader.java | 6 +++
.../realtime/TestHoodieRealtimeRecordReader.java | 44 +++++++++------
.../hudi/hadoop/testutils/InputFormatTestUtil.java | 63 +++++++++++-----------
5 files changed, 81 insertions(+), 56 deletions(-)
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index 04a05a1d6f0..3cd2a5d05d9 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -133,7 +133,6 @@ public abstract class AbstractRealtimeRecordReader {
LOG.warn("fall to init HiveAvroSerializer to support payload merge", e);
this.supportPayload = false;
}
-
}
/**
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
index e8c286d8ab7..22e5389a930 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@@ -84,8 +85,11 @@ public class TestHoodieCombineHiveInputFormat extends
HoodieCommonTestHarness {
}
@AfterAll
- public static void tearDownClass() {
+ public static void tearDownClass() throws IOException {
hdfsTestService.stop();
+ if (fs != null) {
+ fs.close();
+ }
}
@BeforeEach
@@ -93,6 +97,13 @@ public class TestHoodieCombineHiveInputFormat extends
HoodieCommonTestHarness {
assertTrue(fs.mkdirs(new Path(tempDir.toAbsolutePath().toString())));
}
+ @AfterEach
+ public void tearDown() throws IOException {
+ if (fs != null) {
+ fs.delete(new Path(tempDir.toAbsolutePath().toString()), true);
+ }
+ }
+
@Test
public void multiPartitionReadersRealtimeCombineHoodieInputFormat() throws
Exception {
// test for HUDI-1718
@@ -154,8 +165,8 @@ public class TestHoodieCombineHiveInputFormat extends
HoodieCommonTestHarness {
ArrayWritable arrayWritable = recordReader.createValue();
int counter = 0;
- HoodieCombineRealtimeHiveSplit hiveSplit =
(HoodieCombineRealtimeHiveSplit)splits[0];
- HoodieCombineRealtimeFileSplit fileSplit =
(HoodieCombineRealtimeFileSplit)hiveSplit.getInputSplitShim();
+ HoodieCombineRealtimeHiveSplit hiveSplit =
(HoodieCombineRealtimeHiveSplit) splits[0];
+ HoodieCombineRealtimeFileSplit fileSplit =
(HoodieCombineRealtimeFileSplit) hiveSplit.getInputSplitShim();
List<FileSplit> realtimeFileSplits = fileSplit.getRealtimeFileSplits();
while (recordReader.next(nullWritable, arrayWritable)) {
@@ -268,8 +279,8 @@ public class TestHoodieCombineHiveInputFormat extends
HoodieCommonTestHarness {
// insert 1000 update records to log file 2
// now fileid0, fileid1 has no log files, fileid2 has log file
HoodieLogFormat.Writer writer =
- InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs,
schema, "fileid2", commitTime, newCommitTime,
- numRecords, numRecords, 0);
+ InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema,
"fileid2", commitTime, newCommitTime,
+ numRecords, numRecords, 0);
writer.close();
TableDesc tblDesc = Utilities.defaultTd;
@@ -304,7 +315,7 @@ public class TestHoodieCombineHiveInputFormat extends
HoodieCommonTestHarness {
// Since the SPLIT_SIZE is 3, we should create only 1 split with all 3
file groups
assertEquals(1, splits.length);
RecordReader<NullWritable, ArrayWritable> recordReader =
- combineHiveInputFormat.getRecordReader(splits[0], jobConf, null);
+ combineHiveInputFormat.getRecordReader(splits[0], jobConf, null);
NullWritable nullWritable = recordReader.createKey();
ArrayWritable arrayWritable = recordReader.createValue();
int counter = 0;
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
index b37b4170a0c..adee06cc20d 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
@@ -43,6 +43,7 @@ import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.junit.jupiter.api.AfterEach;
@@ -67,6 +68,9 @@ public class TestHoodieMergeOnReadSnapshotReader {
private static final int TOTAL_RECORDS = 100;
private static final String FILE_ID = "fileid0";
+ private static final String COLUMNS =
+
"_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,field1,field2,name,favorite_number,favorite_color,favorite_movie";
+ private static final String COLUMN_TYPES =
"string,string,string,string,string,string,string,string,int,string,string";
private JobConf baseJobConf;
private FileSystem fs;
private Configuration hadoopConf;
@@ -81,6 +85,8 @@ public class TestHoodieMergeOnReadSnapshotReader {
hadoopConf.set("fs.file.impl",
org.apache.hadoop.fs.LocalFileSystem.class.getName());
baseJobConf = new JobConf(hadoopConf);
baseJobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
String.valueOf(1024 * 1024));
+ baseJobConf.set(serdeConstants.LIST_COLUMNS, COLUMNS);
+ baseJobConf.set(serdeConstants.LIST_COLUMN_TYPES, COLUMN_TYPES);
fs = getFs(basePath.toUri().toString(), baseJobConf);
}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index 9fca206ac26..201b18aaa6d 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -71,8 +71,8 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
@@ -101,7 +101,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
-@Disabled("HUDI-6755")
public class TestHoodieRealtimeRecordReader {
private static final String PARTITION_COLUMN = "datestr";
@@ -119,11 +118,22 @@ public class TestHoodieRealtimeRecordReader {
fs = FSUtils.getFs(basePath.toUri().toString(), baseJobConf);
}
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (fs != null) {
+ fs.delete(new Path(basePath.toString()), true);
+ fs.close();
+ }
+ if (baseJobConf != null) {
+ baseJobConf.clear();
+ }
+ }
+
@TempDir
public java.nio.file.Path basePath;
private Writer writeLogFile(File partitionDir, Schema schema, String fileId,
String baseCommit, String newCommit,
- int numberOfRecords) throws InterruptedException, IOException {
+ int numberOfRecords) throws
InterruptedException, IOException {
return InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs,
schema, fileId, baseCommit, newCommit,
numberOfRecords, 0,
0);
@@ -171,8 +181,8 @@ public class TestHoodieRealtimeRecordReader {
}
private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType,
- boolean isCompressionEnabled,
- boolean partitioned,
HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception {
+ boolean isCompressionEnabled,
+ boolean partitioned,
HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception {
// initial commit
Schema schema =
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils.init(hadoopConf, basePath.toString(),
HoodieTableType.MERGE_ON_READ);
@@ -612,7 +622,7 @@ public class TestHoodieRealtimeRecordReader {
String newCommitTime = "101";
File partitionDir1 =
InputFormatTestUtil.prepareSimpleParquetTable(basePath, evolvedSchema,
1, numberOfRecords,
- instantTime, HoodieTableType.MERGE_ON_READ,"2017","05","01");
+ instantTime, HoodieTableType.MERGE_ON_READ, "2017", "05", "01");
HoodieCommitMetadata commitMetadata1 =
CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(),
Option.empty(), WriteOperationType.UPSERT,
evolvedSchema.toString(), HoodieTimeline.COMMIT_ACTION);
FileCreateUtils.createCommit(basePath.toString(), newCommitTime,
Option.of(commitMetadata1));
@@ -665,7 +675,7 @@ public class TestHoodieRealtimeRecordReader {
final int numRecords = 1000;
File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath,
schema, 1, numRecords, instantTime,
HoodieTableType.MERGE_ON_READ);
- createDeltaCommitFile(basePath, instantTime,"2016/05/01",
"2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString());
+ createDeltaCommitFile(basePath, instantTime, "2016/05/01",
"2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString());
// Add the paths
FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
@@ -676,11 +686,11 @@ public class TestHoodieRealtimeRecordReader {
InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs,
schema, "fileid0", instantTime, newCommitTime,
numRecords, numRecords, 0);
writer.close();
- createDeltaCommitFile(basePath, newCommitTime,"2016/05/01",
"2016/05/01/.fileid0_100.log.1_1-0-1", "fileid0", schema.toString());
+ createDeltaCommitFile(basePath, newCommitTime, "2016/05/01",
"2016/05/01/.fileid0_100.log.1_1-0-1", "fileid0", schema.toString());
InputFormatTestUtil.setupIncremental(baseJobConf, "101", 1);
- HoodieParquetRealtimeInputFormat inputFormat = new
HoodieParquetRealtimeInputFormat();
+ HoodieParquetRealtimeInputFormat inputFormat = new
HoodieParquetRealtimeInputFormat();
inputFormat.setConf(baseJobConf);
InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1);
assertEquals(1, splits.length);
@@ -688,7 +698,7 @@ public class TestHoodieRealtimeRecordReader {
List<Schema.Field> fields = schema.getFields();
setHiveColumnNameProps(fields, newJobConf, false);
newJobConf.set("columns.types",
"string,string,string,string,string,string,string,string,bigint,string,string");
- RecordReader<NullWritable, ArrayWritable> reader =
inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL);
+ RecordReader<NullWritable, ArrayWritable> reader =
inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL);
// use reader to read log file.
NullWritable key = reader.createKey();
ArrayWritable value = reader.createValue();
@@ -714,21 +724,21 @@ public class TestHoodieRealtimeRecordReader {
String baseInstant = "100";
File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath,
schema, 1, 100, baseInstant,
HoodieTableType.MERGE_ON_READ);
- createDeltaCommitFile(basePath, baseInstant,"2016/05/01",
"2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString());
+ createDeltaCommitFile(basePath, baseInstant, "2016/05/01",
"2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString());
// Add the paths
FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
InputFormatTestUtil.simulateInserts(partitionDir, ".parquet", "fileid1",
1, "200");
Map<String, List<String>> partitionToReplaceFileIds = new HashMap<>();
- List<String> replacedFileId = new ArrayList<>();
+ List<String> replacedFileId = new ArrayList<>();
replacedFileId.add("fileid0");
partitionToReplaceFileIds.put("2016/05/01", replacedFileId);
createReplaceCommitFile(basePath,
- "200","2016/05/01", "2016/05/01/fileid10_1-0-1_200.parquet",
"fileid10", partitionToReplaceFileIds);
+ "200", "2016/05/01", "2016/05/01/fileid10_1-0-1_200.parquet",
"fileid10", partitionToReplaceFileIds);
InputFormatTestUtil.setupIncremental(baseJobConf, "0", 1);
- HoodieParquetRealtimeInputFormat inputFormat = new
HoodieParquetRealtimeInputFormat();
+ HoodieParquetRealtimeInputFormat inputFormat = new
HoodieParquetRealtimeInputFormat();
inputFormat.setConf(baseJobConf);
InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1);
assertTrue(splits.length == 1);
@@ -736,7 +746,7 @@ public class TestHoodieRealtimeRecordReader {
List<Schema.Field> fields = schema.getFields();
setHiveColumnNameProps(fields, newJobConf, false);
newJobConf.set("columns.types",
"string,string,string,string,string,string,string,string,bigint,string,string");
- RecordReader<NullWritable, ArrayWritable> reader =
inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL);
+ RecordReader<NullWritable, ArrayWritable> reader =
inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL);
// use reader to read log file.
NullWritable key = reader.createKey();
@@ -883,7 +893,7 @@ public class TestHoodieRealtimeRecordReader {
String baseInstant = "100";
File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath,
schema, 1, 100, baseInstant,
HoodieTableType.MERGE_ON_READ);
- createDeltaCommitFile(basePath, baseInstant,"2016/05/01",
"2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString());
+ createDeltaCommitFile(basePath, baseInstant, "2016/05/01",
"2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString());
// Add the paths
FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
@@ -896,7 +906,7 @@ public class TestHoodieRealtimeRecordReader {
InputFormatTestUtil.setupIncremental(baseJobConf, "100", 10);
// verify that incremental reads do NOT show inserts after compaction
timestamp
- HoodieParquetRealtimeInputFormat inputFormat = new
HoodieParquetRealtimeInputFormat();
+ HoodieParquetRealtimeInputFormat inputFormat = new
HoodieParquetRealtimeInputFormat();
inputFormat.setConf(baseJobConf);
InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1);
assertTrue(splits.length == 0);
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index c79fe436f95..4207e3bf113 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -77,7 +77,7 @@ public class InputFormatTestUtil {
}
public static File prepareCustomizedTable(java.nio.file.Path basePath,
HoodieFileFormat baseFileFormat, int numberOfFiles,
- String commitNumber, boolean
useNonPartitionedKeyGen, boolean populateMetaFields, boolean injectData, Schema
schema)
+ String commitNumber, boolean
useNonPartitionedKeyGen, boolean populateMetaFields, boolean injectData, Schema
schema)
throws IOException {
if (useNonPartitionedKeyGen) {
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(),
basePath.toString(), HoodieTableType.COPY_ON_WRITE,
@@ -107,7 +107,7 @@ public class InputFormatTestUtil {
}
public static File prepareMultiPartitionTable(java.nio.file.Path basePath,
HoodieFileFormat baseFileFormat, int numberOfFiles,
- String commitNumber, String
finalLevelPartitionName)
+ String commitNumber, String
finalLevelPartitionName)
throws IOException {
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(),
basePath.toString(), HoodieTableType.COPY_ON_WRITE,
baseFileFormat);
@@ -178,15 +178,15 @@ public class InputFormatTestUtil {
public static void setupIncremental(JobConf jobConf, String startCommit, int
numberOfCommitsToPull, String databaseName, boolean isIncrementalUseDatabase) {
String modePropertyName =
- String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN,
databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+ String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN,
databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
String startCommitTimestampName =
- String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN,
databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+ String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN,
databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(startCommitTimestampName, startCommit);
String maxCommitPulls =
- String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN,
databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+ String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, databaseName
+ "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
jobConf.setBoolean(HoodieHiveUtils.HOODIE_INCREMENTAL_USE_DATABASE,
isIncrementalUseDatabase);
@@ -202,7 +202,7 @@ public class InputFormatTestUtil {
public static void setupSnapshotMaxCommitTimeQueryMode(JobConf jobConf,
String maxInstantTime) {
setUpScanMode(jobConf);
String validateTimestampName =
- String.format(HoodieHiveUtils.HOODIE_CONSUME_COMMIT,
HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+ String.format(HoodieHiveUtils.HOODIE_CONSUME_COMMIT,
HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(validateTimestampName, maxInstantTime);
}
@@ -224,7 +224,7 @@ public class InputFormatTestUtil {
}
public static File prepareParquetTable(java.nio.file.Path basePath, Schema
schema, int numberOfFiles,
- int numberOfRecords, String commitNumber) throws IOException {
+ int numberOfRecords, String
commitNumber) throws IOException {
return prepareParquetTable(basePath, schema, numberOfFiles,
numberOfRecords, commitNumber, HoodieTableType.COPY_ON_WRITE);
}
@@ -241,13 +241,13 @@ public class InputFormatTestUtil {
}
public static File prepareSimpleParquetTable(java.nio.file.Path basePath,
Schema schema, int numberOfFiles,
- int numberOfRecords, String commitNumber) throws Exception {
+ int numberOfRecords, String
commitNumber) throws Exception {
return prepareSimpleParquetTable(basePath, schema, numberOfFiles,
numberOfRecords, commitNumber, HoodieTableType.COPY_ON_WRITE);
}
public static File prepareSimpleParquetTable(java.nio.file.Path basePath,
Schema schema, int numberOfFiles,
int numberOfRecords, String
commitNumber, HoodieTableType tableType) throws Exception {
- return prepareSimpleParquetTable(basePath, schema, numberOfFiles,
numberOfRecords, commitNumber, tableType, "2016","05","01");
+ return prepareSimpleParquetTable(basePath, schema, numberOfFiles,
numberOfRecords, commitNumber, tableType, "2016", "05", "01");
}
public static File prepareSimpleParquetTable(java.nio.file.Path basePath,
Schema schema, int numberOfFiles,
@@ -263,7 +263,7 @@ public class InputFormatTestUtil {
}
public static File prepareNonPartitionedParquetTable(java.nio.file.Path
basePath, Schema schema, int numberOfFiles,
- int numberOfRecords, String commitNumber) throws IOException {
+ int numberOfRecords,
String commitNumber) throws IOException {
return prepareNonPartitionedParquetTable(basePath, schema, numberOfFiles,
numberOfRecords, commitNumber, HoodieTableType.COPY_ON_WRITE);
}
@@ -275,7 +275,7 @@ public class InputFormatTestUtil {
}
public static List<File>
prepareMultiPartitionedParquetTable(java.nio.file.Path basePath, Schema schema,
- int numberPartitions, int numberOfRecordsPerPartition, String
commitNumber, HoodieTableType tableType) throws IOException {
+ int
numberPartitions, int numberOfRecordsPerPartition, String commitNumber,
HoodieTableType tableType) throws IOException {
List<File> result = new ArrayList<>();
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(),
basePath.toString(), tableType, HoodieFileFormat.PARQUET);
for (int i = 0; i < numberPartitions; i++) {
@@ -290,7 +290,7 @@ public class InputFormatTestUtil {
}
private static void createData(Schema schema, java.nio.file.Path
partitionPath, int numberOfFiles, int numberOfRecords,
- String commitNumber) throws IOException {
+ String commitNumber) throws IOException {
AvroParquetWriter parquetWriter;
for (int i = 0; i < numberOfFiles; i++) {
String fileId = FSUtils.makeBaseFileName(commitNumber, TEST_WRITE_TOKEN,
"fileid" + i, HoodieFileFormat.PARQUET.getFileExtension());
@@ -305,8 +305,7 @@ public class InputFormatTestUtil {
}
}
- private static void createSimpleData(Schema schema, java.nio.file.Path
partitionPath, int numberOfFiles, int numberOfRecords,
- String commitNumber) throws Exception {
+ private static void createSimpleData(Schema schema, java.nio.file.Path
partitionPath, int numberOfFiles, int numberOfRecords, String commitNumber)
throws Exception {
AvroParquetWriter parquetWriter;
for (int i = 0; i < numberOfFiles; i++) {
String fileId = FSUtils.makeBaseFileName(commitNumber, "1", "fileid" +
i, HoodieFileFormat.PARQUET.getFileExtension());
@@ -328,7 +327,7 @@ public class InputFormatTestUtil {
}
private static Iterable<? extends GenericRecord> generateAvroRecords(Schema
schema, int numberOfRecords,
- String instantTime, String fileId) throws IOException {
+ String
instantTime, String fileId) throws IOException {
List<GenericRecord> records = new ArrayList<>(numberOfRecords);
for (int i = 0; i < numberOfRecords; i++) {
records.add(SchemaTestUtil.generateAvroRecordFromJson(schema, i,
instantTime, fileId));
@@ -337,7 +336,7 @@ public class InputFormatTestUtil {
}
public static void simulateParquetUpdates(File directory, Schema schema,
String originalCommit,
- int totalNumberOfRecords, int numberOfRecordsToUpdate, String newCommit)
throws IOException {
+ int totalNumberOfRecords, int
numberOfRecordsToUpdate, String newCommit) throws IOException {
File fileToUpdate = Objects.requireNonNull(directory.listFiles((dir, name)
-> name.endsWith("parquet")))[0];
String fileId = FSUtils.getFileId(fileToUpdate.getName());
File dataFile = new File(directory,
@@ -410,8 +409,7 @@ public class InputFormatTestUtil {
}
public static HoodieLogFormat.Writer writeRollbackBlockToLogFile(File
partitionDir, FileSystem fs, Schema schema,
- String
- fileId, String baseCommit, String newCommit, String oldCommit, int
logVersion)
+ String
fileId, String baseCommit, String newCommit, String oldCommit, int logVersion)
throws InterruptedException, IOException {
HoodieLogFormat.Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(new
Path(partitionDir.getPath()))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).overBaseCommit(baseCommit)
@@ -429,7 +427,7 @@ public class InputFormatTestUtil {
}
public static void setProjectFieldsForInputFormat(JobConf jobConf,
- Schema schema, String hiveColumnTypes) {
+ Schema schema, String
hiveColumnTypes) {
List<Schema.Field> fields = schema.getFields();
String names = fields.stream().map(f ->
f.name().toString()).collect(Collectors.joining(","));
String positions = fields.stream().map(f ->
String.valueOf(f.pos())).collect(Collectors.joining(","));
@@ -456,7 +454,7 @@ public class InputFormatTestUtil {
}
public static void setPropsForInputFormat(JobConf jobConf,
- Schema schema, String hiveColumnTypes) {
+ Schema schema, String
hiveColumnTypes) {
List<Schema.Field> fields = schema.getFields();
String names = fields.stream().map(f ->
f.name().toString()).collect(Collectors.joining(","));
String positions = fields.stream().map(f ->
String.valueOf(f.pos())).collect(Collectors.joining(","));
@@ -484,18 +482,19 @@ public class InputFormatTestUtil {
Files.createDirectories(partitionPath);
// Create partition metadata to properly setup table's partition
- RawLocalFileSystem lfs = new RawLocalFileSystem();
- lfs.setConf(HoodieTestUtils.getDefaultHadoopConf());
-
- HoodiePartitionMetadata partitionMetadata =
- new HoodiePartitionMetadata(
- new LocalFileSystem(lfs),
- "0",
- new Path(basePath.toAbsolutePath().toString()),
- new Path(partitionPath.toAbsolutePath().toString()),
- Option.of(HoodieFileFormat.PARQUET));
-
- partitionMetadata.trySave((int) (Math.random() * 1000));
+ try (RawLocalFileSystem lfs = new RawLocalFileSystem()) {
+ lfs.setConf(HoodieTestUtils.getDefaultHadoopConf());
+
+ HoodiePartitionMetadata partitionMetadata =
+ new HoodiePartitionMetadata(
+ new LocalFileSystem(lfs),
+ "0",
+ new Path(basePath.toAbsolutePath().toString()),
+ new Path(partitionPath.toAbsolutePath().toString()),
+ Option.of(HoodieFileFormat.PARQUET));
+
+ partitionMetadata.trySave((int) (Math.random() * 1000));
+ }
}
public static void setInputPath(JobConf jobConf, String inputPath) {