This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 594da28 [HUDI-595] code cleanup, refactoring code out of PR# 1159
(#1302)
594da28 is described below
commit 594da28fbf64fb20432e718a409577fd10516c4a
Author: Suneel Marthi <[email protected]>
AuthorDate: Tue Feb 4 14:52:03 2020 +0100
[HUDI-595] code cleanup, refactoring code out of PR# 1159 (#1302)
---
.../org/apache/hudi/index/hbase/HBaseIndex.java | 23 ++-------
.../org/apache/hudi/io/HoodieCommitArchiveLog.java | 6 +--
.../io/compact/strategy/CompactionStrategy.java | 8 +--
.../main/java/org/apache/hudi/metrics/Metrics.java | 17 +++----
.../src/test/java/org/apache/hudi/TestCleaner.java | 37 +++++++-------
.../hudi/common/HoodieTestDataGenerator.java | 14 ++----
.../index/bloom/TestHoodieGlobalBloomIndex.java | 51 ++++++++++++-------
.../org/apache/hudi/common/HoodieJsonPayload.java | 5 +-
.../hudi/common/table/log/HoodieLogFileReader.java | 33 ++++++------
.../java/org/apache/hudi/common/util/FSUtils.java | 2 +-
.../hudi/common/minicluster/HdfsTestService.java | 2 +-
.../hudi/common/table/log/TestHoodieLogFormat.java | 26 +++-------
.../table/view/TestHoodieTableFileSystemView.java | 10 ++--
.../common/util/collection/TestDiskBasedMap.java | 6 +--
.../hadoop/hive/HoodieCombineHiveInputFormat.java | 9 ++--
.../realtime/TestHoodieRealtimeRecordReader.java | 5 +-
.../java/org/apache/hudi/hive/util/SchemaUtil.java | 31 +++++-------
.../org/apache/hudi/hive/TestHiveSyncTool.java | 4 +-
.../org/apache/hudi/hive/util/HiveTestService.java | 11 ++--
.../org/apache/hudi/integ/ITTestHoodieDemo.java | 58 +++++++++++-----------
.../apache/hudi/utilities/HDFSParquetImporter.java | 7 ++-
.../hudi/utilities/perf/TimelineServerPerf.java | 20 +++-----
.../sources/helpers/IncrSourceHelper.java | 4 +-
.../utilities/sources/helpers/KafkaOffsetGen.java | 7 +--
24 files changed, 172 insertions(+), 224 deletions(-)
diff --git
a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
index 3f79096..12d352d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
@@ -205,9 +205,7 @@ public class HBaseIndex<T extends HoodieRecordPayload>
extends HoodieIndex<T> {
}
}
List<HoodieRecord<T>> taggedRecords = new ArrayList<>();
- HTable hTable = null;
- try {
- hTable = (HTable)
hbaseConnection.getTable(TableName.valueOf(tableName));
+ try (HTable hTable = (HTable)
hbaseConnection.getTable(TableName.valueOf(tableName))) {
List<Get> statements = new ArrayList<>();
List<HoodieRecord> currentBatchOfRecords = new LinkedList<>();
// Do the tagging.
@@ -250,15 +248,6 @@ public class HBaseIndex<T extends HoodieRecordPayload>
extends HoodieIndex<T> {
}
} catch (IOException e) {
throw new HoodieIndexException("Failed to Tag indexed locations
because of exception with HBase Client", e);
- } finally {
- if (hTable != null) {
- try {
- hTable.close();
- } catch (IOException e) {
- // Ignore
- }
- }
-
}
return taggedRecords.iterator();
};
@@ -444,16 +433,14 @@ public class HBaseIndex<T extends HoodieRecordPayload>
extends HoodieIndex<T> {
*/
public int getBatchSize(int numRegionServersForTable, int
maxQpsPerRegionServer, int numTasksDuringPut,
int maxExecutors, int sleepTimeMs, float qpsFraction) {
- int numRSAlive = numRegionServersForTable;
- int maxReqPerSec = (int) (qpsFraction * numRSAlive *
maxQpsPerRegionServer);
- int numTasks = numTasksDuringPut;
- int maxParallelPuts = Math.max(1, Math.min(numTasks, maxExecutors));
+ int maxReqPerSec = (int) (qpsFraction * numRegionServersForTable *
maxQpsPerRegionServer);
+ int maxParallelPuts = Math.max(1, Math.min(numTasksDuringPut,
maxExecutors));
int maxReqsSentPerTaskPerSec = MILLI_SECONDS_IN_A_SECOND / sleepTimeMs;
int multiPutBatchSize = Math.max(1, maxReqPerSec / (maxParallelPuts *
maxReqsSentPerTaskPerSec));
LOG.info("HbaseIndexThrottling: qpsFraction :" + qpsFraction);
- LOG.info("HbaseIndexThrottling: numRSAlive :" + numRSAlive);
+ LOG.info("HbaseIndexThrottling: numRSAlive :" +
numRegionServersForTable);
LOG.info("HbaseIndexThrottling: maxReqPerSec :" + maxReqPerSec);
- LOG.info("HbaseIndexThrottling: numTasks :" + numTasks);
+ LOG.info("HbaseIndexThrottling: numTasks :" + numTasksDuringPut);
LOG.info("HbaseIndexThrottling: maxExecutors :" + maxExecutors);
LOG.info("HbaseIndexThrottling: maxParallelPuts :" + maxParallelPuts);
LOG.info("HbaseIndexThrottling: maxReqsSentPerTaskPerSec :" +
maxReqsSentPerTaskPerSec);
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java
index bafbc8d..6847a24 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java
@@ -147,9 +147,9 @@ public class HoodieCommitArchiveLog {
HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
.getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants();
Stream<HoodieInstant> instants = cleanAndRollbackTimeline.getInstants()
- .collect(Collectors.groupingBy(s ->
s.getAction())).entrySet().stream().map(i -> {
- if (i.getValue().size() > maxCommitsToKeep) {
- return i.getValue().subList(0, i.getValue().size() -
minCommitsToKeep);
+
.collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream().map(hoodieInstants
-> {
+ if (hoodieInstants.size() > maxCommitsToKeep) {
+ return hoodieInstants.subList(0, hoodieInstants.size() -
minCommitsToKeep);
} else {
return new ArrayList<HoodieInstant>();
}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java
b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java
index 4c03116..dd17212 100644
---
a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java
+++
b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java
@@ -62,10 +62,10 @@ public abstract class CompactionStrategy implements
Serializable {
public Map<String, Double> captureMetrics(HoodieWriteConfig writeConfig,
Option<HoodieBaseFile> dataFile,
String partitionPath, List<HoodieLogFile> logFiles) {
Map<String, Double> metrics = Maps.newHashMap();
- Long defaultMaxParquetFileSize = writeConfig.getParquetMaxFileSize();
+ long defaultMaxParquetFileSize = writeConfig.getParquetMaxFileSize();
// Total size of all the log files
Long totalLogFileSize =
logFiles.stream().map(HoodieLogFile::getFileSize).filter(size -> size >= 0)
- .reduce((size1, size2) -> size1 + size2).orElse(0L);
+ .reduce(Long::sum).orElse(0L);
// Total read will be the base file + all the log files
Long totalIORead =
FSUtils.getSizeInMB((dataFile.isPresent() ?
dataFile.get().getFileSize() : 0L) + totalLogFileSize);
@@ -73,11 +73,11 @@ public abstract class CompactionStrategy implements
Serializable {
Long totalIOWrite =
FSUtils.getSizeInMB(dataFile.isPresent() ?
dataFile.get().getFileSize() : defaultMaxParquetFileSize);
// Total IO will the the IO for read + write
- Long totalIO = totalIORead + totalIOWrite;
+ long totalIO = totalIORead + totalIOWrite;
// Save these metrics and we will use during the filter
metrics.put(TOTAL_IO_READ_MB, totalIORead.doubleValue());
metrics.put(TOTAL_IO_WRITE_MB, totalIOWrite.doubleValue());
- metrics.put(TOTAL_IO_MB, totalIO.doubleValue());
+ metrics.put(TOTAL_IO_MB, (double) totalIO);
metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize.doubleValue());
metrics.put(TOTAL_LOG_FILES, (double) logFiles.size());
return metrics;
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
index 4b19441..533208f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -49,17 +49,14 @@ public class Metrics {
}
// reporter.start();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- try {
- reporter.report();
- Closeables.close(reporter.getReporter(), true);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ reporter.report();
+ Closeables.close(reporter.getReporter(), true);
+ } catch (Exception e) {
+ e.printStackTrace();
}
- });
+ }));
}
public static Metrics getInstance() {
diff --git a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
index 24aa9cd..662273a 100644
--- a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
@@ -68,6 +68,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -106,7 +107,7 @@ public class TestCleaner extends TestHoodieClientBase {
Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
Function3<JavaRDD<WriteStatus>, HoodieWriteClient,
JavaRDD<HoodieRecord>, String> insertFn) throws Exception {
- /**
+ /*
* do a big insert (this is basically same as insert part of upsert, just
adding it here so we can catch breakages
* in insert(), if the implementation diverges.)
*/
@@ -606,8 +607,8 @@ public class TestCleaner extends TestHoodieClientBase {
String filePath2 = metaClient.getBasePath() + "/" + partition1 + "/" +
fileName2;
List<String> deletePathPatterns1 = Arrays.asList(filePath1, filePath2);
- List<String> successDeleteFiles1 = Arrays.asList(filePath1);
- List<String> failedDeleteFiles1 = Arrays.asList(filePath2);
+ List<String> successDeleteFiles1 = Collections.singletonList(filePath1);
+ List<String> failedDeleteFiles1 = Collections.singletonList(filePath2);
// create partition1 clean stat.
HoodieCleanStat cleanStat1 = new
HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
@@ -630,7 +631,8 @@ public class TestCleaner extends TestHoodieClientBase {
// map with relative path.
Map<String, Tuple3> newExpected = new HashMap<>();
- newExpected.put(partition1, new Tuple3<>(Arrays.asList(fileName1,
fileName2), Arrays.asList(fileName1), Arrays.asList(fileName2)));
+ newExpected.put(partition1, new Tuple3<>(Arrays.asList(fileName1,
fileName2), Collections.singletonList(fileName1),
+ Collections.singletonList(fileName2)));
newExpected.put(partition2, new Tuple3<>(deletePathPatterns2,
successDeleteFiles2, failedDeleteFiles2));
HoodieCleanMetadata metadata =
@@ -1079,19 +1081,18 @@ public class TestCleaner extends TestHoodieClientBase {
});
// Test for progress (Did we clean some files ?)
- long numFilesUnderCompactionDeleted =
hoodieCleanStats.stream().flatMap(cleanStat -> {
- return convertPathToFileIdWithCommitTime(newMetaClient,
cleanStat.getDeletePathPatterns())
- .map(fileIdWithCommitTime -> {
- if
(expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) {
- Assert.assertTrue("Deleted instant time must be less than
pending compaction",
- HoodieTimeline.compareTimestamps(
-
fileIdToLatestInstantBeforeCompaction.get(fileIdWithCommitTime.getKey()),
- fileIdWithCommitTime.getValue(),
HoodieTimeline.GREATER));
- return true;
- }
- return false;
- });
- }).filter(x -> x).count();
+ long numFilesUnderCompactionDeleted = hoodieCleanStats.stream()
+ .flatMap(cleanStat ->
convertPathToFileIdWithCommitTime(newMetaClient,
cleanStat.getDeletePathPatterns())
+ .map(fileIdWithCommitTime -> {
+ if
(expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) {
+ Assert.assertTrue("Deleted instant time must be less than pending
compaction",
+ HoodieTimeline.compareTimestamps(
+
fileIdToLatestInstantBeforeCompaction.get(fileIdWithCommitTime.getKey()),
+ fileIdWithCommitTime.getValue(), HoodieTimeline.GREATER));
+ return true;
+ }
+ return false;
+ })).filter(x -> x).count();
long numDeleted =
hoodieCleanStats.stream().mapToLong(cleanStat ->
cleanStat.getDeletePathPatterns().size()).sum();
// Tighter check for regression
@@ -1123,7 +1124,7 @@ public class TestCleaner extends TestHoodieClientBase {
* @throws IOException in case of error
*/
private int getTotalTempFiles() throws IOException {
- RemoteIterator itr = fs.listFiles(new Path(basePath,
HoodieTableMetaClient.TEMPFOLDER_NAME), true);
+ RemoteIterator<?> itr = fs.listFiles(new Path(basePath,
HoodieTableMetaClient.TEMPFOLDER_NAME), true);
int count = 0;
while (itr.hasNext()) {
count++;
diff --git
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
index 56b3c07..e0d2a53 100644
---
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
+++
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
@@ -210,13 +210,10 @@ public class HoodieTestDataGenerator {
Path commitFile =
new Path(basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME +
"/" + instant.getFileName());
FileSystem fs = FSUtils.getFs(basePath, configuration);
- FSDataOutputStream os = fs.create(commitFile, true);
- HoodieCompactionPlan workload = new HoodieCompactionPlan();
- try {
+ try (FSDataOutputStream os = fs.create(commitFile, true)) {
+ HoodieCompactionPlan workload = new HoodieCompactionPlan();
// Write empty commit metadata
os.writeBytes(new
String(AvroUtils.serializeCompactionPlan(workload).get(),
StandardCharsets.UTF_8));
- } finally {
- os.close();
}
}
@@ -225,13 +222,10 @@ public class HoodieTestDataGenerator {
Path commitFile = new Path(basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeSavePointFileName(commitTime));
FileSystem fs = FSUtils.getFs(basePath, configuration);
- FSDataOutputStream os = fs.create(commitFile, true);
- HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
- try {
+ try (FSDataOutputStream os = fs.create(commitFile, true)) {
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
// Write empty commit metadata
os.writeBytes(new
String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
- } finally {
- os.close();
}
}
diff --git
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index 15a8af7..c605654 100644
---
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -42,6 +42,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -246,13 +247,17 @@ public class TestHoodieGlobalBloomIndex extends
HoodieClientTestHarness {
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1,
record2, record3, record5));
String filename0 =
- HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01",
Arrays.asList(record1), schema, null, false);
+ HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01",
Collections.singletonList(record1),
+ schema, null, false);
String filename1 =
- HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12",
Lists.newArrayList(), schema, null, false);
+ HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12",
Lists.newArrayList(),
+ schema, null, false);
String filename2 =
- HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12",
Arrays.asList(record2), schema, null, false);
+ HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12",
Collections.singletonList(record2),
+ schema, null, false);
String filename3 =
- HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12",
Arrays.asList(record4), schema, null, false);
+ HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12",
Collections.singletonList(record4),
+ schema, null, false);
// intentionally missed the partition "2015/03/12" to see if the
GlobalBloomIndex can pick it up
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -265,21 +270,29 @@ public class TestHoodieGlobalBloomIndex extends
HoodieClientTestHarness {
JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc,
table);
for (HoodieRecord record : taggedRecordRDD.collect()) {
- if (record.getRecordKey().equals("000")) {
-
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename0)));
- assertEquals(((TestRawTripPayload) record.getData()).getJsonData(),
rowChange1.getJsonData());
- } else if (record.getRecordKey().equals("001")) {
-
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename2)));
- assertEquals(((TestRawTripPayload) record.getData()).getJsonData(),
rowChange2.getJsonData());
- } else if (record.getRecordKey().equals("002")) {
- assertTrue(!record.isCurrentLocationKnown());
- assertEquals(((TestRawTripPayload) record.getData()).getJsonData(),
rowChange3.getJsonData());
- } else if (record.getRecordKey().equals("003")) {
-
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3)));
- assertEquals(((TestRawTripPayload) record.getData()).getJsonData(),
rowChange5.getJsonData());
- } else if (record.getRecordKey().equals("004")) {
-
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3)));
- assertEquals(((TestRawTripPayload) record.getData()).getJsonData(),
rowChange4.getJsonData());
+ switch (record.getRecordKey()) {
+ case "000":
+ assertEquals(record.getCurrentLocation().getFileId(),
FSUtils.getFileId(filename0));
+ assertEquals(((TestRawTripPayload) record.getData()).getJsonData(),
rowChange1.getJsonData());
+ break;
+ case "001":
+ assertEquals(record.getCurrentLocation().getFileId(),
FSUtils.getFileId(filename2));
+ assertEquals(((TestRawTripPayload) record.getData()).getJsonData(),
rowChange2.getJsonData());
+ break;
+ case "002":
+ assertFalse(record.isCurrentLocationKnown());
+ assertEquals(((TestRawTripPayload) record.getData()).getJsonData(),
rowChange3.getJsonData());
+ break;
+ case "003":
+ assertEquals(record.getCurrentLocation().getFileId(),
FSUtils.getFileId(filename3));
+ assertEquals(((TestRawTripPayload) record.getData()).getJsonData(),
rowChange5.getJsonData());
+ break;
+ case "004":
+ assertEquals(record.getCurrentLocation().getFileId(),
FSUtils.getFileId(filename3));
+ assertEquals(((TestRawTripPayload) record.getData()).getJsonData(),
rowChange4.getJsonData());
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown Key: " +
record.getRecordKey());
}
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java
index 9e95fd8..1c15c66 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java
@@ -86,11 +86,8 @@ public class HoodieJsonPayload implements
HoodieRecordPayload<HoodieJsonPayload>
}
private String unCompressData(byte[] data) throws IOException {
- InflaterInputStream iis = new InflaterInputStream(new
ByteArrayInputStream(data));
- try {
+ try (InflaterInputStream iis = new InflaterInputStream(new
ByteArrayInputStream(data))) {
return FileIOUtils.readAsUTFString(iis, dataSize);
- } finally {
- iis.close();
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 354f809..40a5243 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -46,6 +46,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
/**
* Scans a log file and provides block level iterator on the log file Loads
the entire block contents in memory Can emit
@@ -107,25 +108,22 @@ class HoodieLogFileReader implements
HoodieLogFormat.Reader {
* Close the inputstream if not closed when the JVM exits.
*/
private void addShutDownHook() {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- try {
- close();
- } catch (Exception e) {
- LOG.warn("unable to close input stream for log file " + logFile, e);
- // fail silently for any sort of exception
- }
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ close();
+ } catch (Exception e) {
+ LOG.warn("unable to close input stream for log file " + logFile, e);
+ // fail silently for any sort of exception
}
- });
+ }));
}
// TODO : convert content and block length to long by using ByteBuffer, raw
byte [] allows
// for max of Integer size
private HoodieLogBlock readBlock() throws IOException {
- int blocksize = -1;
- int type = -1;
+ int blocksize;
+ int type;
HoodieLogBlockType blockType = null;
Map<HeaderMetadataType, String> header = null;
@@ -190,7 +188,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader
{
// 9. Read the log block end position in the log file
long blockEndPos = inputStream.getPos();
- switch (blockType) {
+ switch (Objects.requireNonNull(blockType)) {
// based on type read the block
case AVRO_DATA_BLOCK:
if (nextBlockVersion.getVersion() ==
HoodieLogFormatVersion.DEFAULT_VERSION) {
@@ -278,10 +276,10 @@ class HoodieLogFileReader implements
HoodieLogFormat.Reader {
}
}
- @Override
- /**
+ /*
* hasNext is not idempotent. TODO - Fix this. It is okay for now - PR
*/
+ @Override
public boolean hasNext() {
try {
return readMagic();
@@ -315,10 +313,7 @@ class HoodieLogFileReader implements
HoodieLogFormat.Reader {
long pos = inputStream.getPos();
// 1. Read magic header from the start of the block
inputStream.readFully(MAGIC_BUFFER, 0, 6);
- if (!Arrays.equals(MAGIC_BUFFER, HoodieLogFormat.MAGIC)) {
- return false;
- }
- return true;
+ return Arrays.equals(MAGIC_BUFFER, HoodieLogFormat.MAGIC);
}
@Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java
index 43b0030..87925c7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java
@@ -193,7 +193,7 @@ public class FSUtils {
return partitions;
}
- public static final List<String> getAllDataFilesForMarkers(FileSystem fs,
String basePath, String instantTs,
+ public static List<String> getAllDataFilesForMarkers(FileSystem fs, String
basePath, String instantTs,
String markerDir) throws IOException {
List<String> dataFiles = new LinkedList<>();
processFiles(fs, markerDir, (status) -> {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java
b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java
index 9bc9a8d..7fb3bfd 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java
@@ -79,7 +79,7 @@ public class HdfsTestService {
// Configure and start the HDFS cluster
// boolean format = shouldFormatDFSCluster(localDFSLocation, clean);
- hadoopConf = configureDFSCluster(hadoopConf, localDFSLocation, bindIP,
namenodeRpcPort,
+ configureDFSCluster(hadoopConf, localDFSLocation, bindIP, namenodeRpcPort,
datanodePort, datanodeIpcPort, datanodeHttpPort);
miniDfsCluster = new
MiniDFSCluster.Builder(hadoopConf).numDataNodes(1).format(format).checkDataNodeAddrConfig(true)
.checkDataNodeHostConfig(true).build();
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
index 1b9667c..6c01d4a 100755
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
@@ -285,7 +285,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
}
}
- /**
+ /*
* This is actually a test on concurrent append and not recovery lease.
Commenting this out.
* https://issues.apache.org/jira/browse/HUDI-117
*/
@@ -337,7 +337,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
assertEquals(2, statuses.length);
}
- @SuppressWarnings("unchecked")
@Test
public void testBasicWriteAndScan() throws IOException, URISyntaxException,
InterruptedException {
Writer writer =
@@ -366,7 +365,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
reader.close();
}
- @SuppressWarnings("unchecked")
@Test
public void testBasicAppendAndRead() throws IOException, URISyntaxException,
InterruptedException {
Writer writer =
@@ -434,7 +432,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
reader.close();
}
- @SuppressWarnings("unchecked")
@Test
public void testBasicAppendAndScanMultipleFiles() throws IOException,
URISyntaxException, InterruptedException {
Writer writer =
@@ -911,11 +908,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
writer = writer.appendBlock(dataBlock);
- List<String> originalKeys =
- copyOfRecords1.stream().map(s -> ((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
- .collect(Collectors.toList());
-
- // Delete 50 keys
// Delete 50 keys
List<HoodieKey> deletedKeys = copyOfRecords1.stream()
.map(s -> (new HoodieKey(((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
@@ -1127,8 +1119,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
* duplicate data.
*
*/
- private void testAvroLogRecordReaderMergingMultipleLogFiles(int
numRecordsInLog1, int numRecordsInLog2)
- throws IOException, URISyntaxException, InterruptedException {
+ private void testAvroLogRecordReaderMergingMultipleLogFiles(int
numRecordsInLog1, int numRecordsInLog2) {
try {
// Write one Data block with same InstantTime (written in same batch)
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
@@ -1178,8 +1169,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
}
@Test
- public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt()
- throws IOException, URISyntaxException, InterruptedException {
+ public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt() {
/*
* FIRST_ATTEMPT_FAILED:
* Original task from the stage attempt failed, but subsequent stage retry
succeeded.
@@ -1188,8 +1178,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
}
@Test
- public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt()
- throws IOException, URISyntaxException, InterruptedException {
+ public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt() {
/*
* SECOND_ATTEMPT_FAILED:
* Original task from stage attempt succeeded, but subsequent retry
attempt failed.
@@ -1198,8 +1187,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
}
@Test
- public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts()
- throws IOException, URISyntaxException, InterruptedException {
+ public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts() {
/*
* BOTH_ATTEMPTS_SUCCEEDED:
* Original task from the stage attempt and duplicate task from the stage
retry succeeded.
@@ -1207,7 +1195,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
testAvroLogRecordReaderMergingMultipleLogFiles(100, 100);
}
- @SuppressWarnings("unchecked")
@Test
public void testBasicAppendAndReadInReverse() throws IOException,
URISyntaxException, InterruptedException {
Writer writer =
@@ -1335,7 +1322,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
reader.close();
}
- @SuppressWarnings("unchecked")
@Test
public void testBasicAppendAndTraverseInReverse() throws IOException,
URISyntaxException, InterruptedException {
Writer writer =
@@ -1392,7 +1378,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
}
@Test
- public void testV0Format() throws IOException, InterruptedException,
URISyntaxException {
+ public void testV0Format() throws IOException, URISyntaxException {
// HoodieLogFormatVersion.DEFAULT_VERSION has been deprecated so we cannot
// create a writer for it. So these tests are only for the
HoodieAvroDataBlock
// of older version.
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index 2b8f04f..0a910e9 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -221,7 +221,7 @@ public class TestHoodieTableFileSystemView extends
HoodieCommonTestHarness {
*/
public Stream<FileSlice> getLatestRawFileSlices(String partitionPath) {
return
fsView.getAllFileGroups(partitionPath).map(HoodieFileGroup::getLatestFileSlicesIncludingInflight)
- .filter(fileSliceOpt -> fileSliceOpt.isPresent()).map(Option::get);
+ .filter(Option::isPresent).map(Option::get);
}
/**
@@ -322,7 +322,7 @@ public class TestHoodieTableFileSystemView extends
HoodieCommonTestHarness {
assertEquals("Expect only valid data-file", dataFileName,
dataFiles.get(0).getFileName());
}
- /** Merge API Tests **/
+ // Merge API Tests
List<FileSlice> fileSliceList =
rtView.getLatestMergedFileSlicesBeforeOrOn(partitionPath,
deltaInstantTime5).collect(Collectors.toList());
assertEquals("Expect file-slice to be merged", 1, fileSliceList.size());
@@ -355,7 +355,7 @@ public class TestHoodieTableFileSystemView extends
HoodieCommonTestHarness {
assertEquals("Log File Order check", fileName4,
logFiles.get(0).getFileName());
assertEquals("Log File Order check", fileName3,
logFiles.get(1).getFileName());
- /** Data Files API tests */
+ // Data Files API tests
dataFiles = roView.getLatestBaseFiles().collect(Collectors.toList());
if (skipCreatingDataFile) {
assertEquals("Expect no data file to be returned", 0, dataFiles.size());
@@ -385,7 +385,7 @@ public class TestHoodieTableFileSystemView extends
HoodieCommonTestHarness {
dataFiles.forEach(df -> assertEquals("Expect data-file for instant 1 be
returned", df.getCommitTime(), instantTime1));
}
- /** Inflight/Orphan File-groups needs to be in the view **/
+ // Inflight/Orphan File-groups needs to be in the view
// There is a data-file with this inflight file-id
final String inflightFileId1 = UUID.randomUUID().toString();
@@ -507,7 +507,7 @@ public class TestHoodieTableFileSystemView extends
HoodieCommonTestHarness {
assertEquals("Log File Order check", fileName4,
logFiles.get(0).getFileName());
assertEquals("Log File Order check", fileName3,
logFiles.get(1).getFileName());
- /** Data Files API tests */
+ // Data Files API tests
dataFiles = roView.getLatestBaseFiles().collect(Collectors.toList());
assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
dataFiles.forEach(df -> assertEquals("Expect data-file created by
compaction be returned", df.getCommitTime(), compactionRequestedTime));
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java
index 76d7e06..2cc726e 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java
@@ -167,7 +167,7 @@ public class TestDiskBasedMap extends
HoodieCommonTestHarness {
schema = SchemaTestUtil.getSimpleSchema();
List<IndexedRecord> indexedRecords =
SchemaTestUtil.generateHoodieTestRecords(0, 1);
hoodieRecords =
- indexedRecords.stream().map(r -> new HoodieRecord(new
HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
+ indexedRecords.stream().map(r -> new HoodieRecord<>(new
HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
new AvroBinaryTestPayload(Option.of((GenericRecord)
r)))).collect(Collectors.toList());
payloadSize =
SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0), new
HoodieRecordSizeEstimator(schema));
assertTrue(payloadSize > 0);
@@ -176,7 +176,7 @@ public class TestDiskBasedMap extends
HoodieCommonTestHarness {
final Schema simpleSchemaWithMetadata =
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1);
hoodieRecords = indexedRecords.stream()
- .map(r -> new HoodieRecord(new HoodieKey(UUID.randomUUID().toString(),
"0000/00/00"),
+ .map(r -> new HoodieRecord<>(new
HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
new AvroBinaryTestPayload(
Option.of(HoodieAvroUtils.rewriteRecord((GenericRecord) r,
simpleSchemaWithMetadata)))))
.collect(Collectors.toList());
@@ -193,7 +193,7 @@ public class TestDiskBasedMap extends
HoodieCommonTestHarness {
// Test sizeEstimatorPerformance with simpleSchema
Schema schema = SchemaTestUtil.getSimpleSchema();
List<HoodieRecord> hoodieRecords =
SchemaTestUtil.generateHoodieTestRecords(0, 1, schema);
- HoodieRecordSizeEstimator sizeEstimator = new
HoodieRecordSizeEstimator(schema);
+ HoodieRecordSizeEstimator sizeEstimator = new
HoodieRecordSizeEstimator<>(schema);
HoodieRecord record = hoodieRecords.remove(0);
long startTime = System.currentTimeMillis();
SpillableMapUtils.computePayloadSize(record, sizeEstimator);
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
index 506b6cf..0c3f141 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
@@ -333,8 +333,7 @@ public class HoodieCombineHiveInputFormat<K extends
WritableComparable, V extend
if (o instanceof CombinePathInputFormat) {
CombinePathInputFormat mObj = (CombinePathInputFormat) o;
return (opList.equals(mObj.opList)) &&
(inputFormatClassName.equals(mObj.inputFormatClassName))
- && (deserializerClassName == null ? (mObj.deserializerClassName ==
null)
- : deserializerClassName.equals(mObj.deserializerClassName));
+ && (Objects.equals(deserializerClassName,
mObj.deserializerClassName));
}
return false;
}
@@ -353,16 +352,16 @@ public class HoodieCombineHiveInputFormat<K extends
WritableComparable, V extend
init(job);
Map<Path, ArrayList<String>> pathToAliases = mrwork.getPathToAliases();
Map<String, Operator<? extends OperatorDesc>> aliasToWork =
mrwork.getAliasToWork();
- /** MOD - Initialize a custom combine input format shim that will call
listStatus on the custom inputFormat **/
+ /* MOD - Initialize a custom combine input format shim that will call
listStatus on the custom inputFormat **/
HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim combine =
- new HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim();
+ new HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim<>();
InputSplit[] splits;
if (combine.getInputPathsShim(job).length == 0) {
throw new IOException("No input paths specified in job");
}
- ArrayList<InputSplit> result = new ArrayList<>();
+ List<InputSplit> result = new ArrayList<>();
// combine splits only from same tables and same partitions. Do not
combine splits from multiple
// tables or multiple partitions.
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index 89b7168..0586bc4 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -210,7 +210,7 @@ public class TestHoodieRealtimeRecordReader {
action.equals(HoodieTimeline.ROLLBACK_ACTION) ?
String.valueOf(baseInstantTs + logVersion - 2)
: instantTime;
- HoodieLogFormat.Writer writer = null;
+ HoodieLogFormat.Writer writer;
if (action.equals(HoodieTimeline.ROLLBACK_ACTION)) {
writer = writeRollback(partitionDir, schema, "fileid0", baseInstant,
instantTime,
String.valueOf(baseInstantTs + logVersion - 1), logVersion);
@@ -317,7 +317,7 @@ public class TestHoodieRealtimeRecordReader {
numRecordsAtCommit2++;
Assert.assertTrue(gotKey > firstBatchLastRecordKey);
Assert.assertTrue(gotKey <= secondBatchLastRecordKey);
- assertEquals((int) gotKey, lastSeenKeyFromLog + 1);
+ assertEquals(gotKey, lastSeenKeyFromLog + 1);
lastSeenKeyFromLog++;
} else {
numRecordsAtCommit1++;
@@ -491,7 +491,6 @@ public class TestHoodieRealtimeRecordReader {
writer = writeRollbackBlockToLogFile(partitionDir, schema, "fileid0",
commitTime, newCommitTime, "101", 1);
logFilePaths.add(writer.getLogFile().getPath().toString());
writer.close();
- assertTrue("block - size should be > 0", size > 0);
InputFormatTestUtil.deltaCommit(basePath, newCommitTime);
// create a split with baseFile (parquet file written earlier) and new log
file(s)
diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
index d85778a..6ca9957 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
@@ -18,6 +18,10 @@
package org.apache.hudi.hive.util;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
@@ -26,11 +30,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.hive.SchemaDifference;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
@@ -46,7 +45,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
/**
* Schema Utilities.
@@ -367,10 +365,9 @@ public class SchemaUtil {
return true;
} else if (prevType.equalsIgnoreCase("float") &&
newType.equalsIgnoreCase("double")) {
return true;
- } else if (prevType.contains("struct") &&
newType.toLowerCase().contains("struct")) {
- return true;
+ } else {
+ return prevType.contains("struct") &&
newType.toLowerCase().contains("struct");
}
- return false;
}
public static String generateSchemaString(MessageType storageSchema) throws
IOException {
@@ -403,18 +400,17 @@ public class SchemaUtil {
.append(getPartitionKeyType(hiveSchema,
partitionKeyWithTicks)).toString());
}
- String partitionsStr =
partitionFields.stream().collect(Collectors.joining(","));
+ String partitionsStr = String.join(",", partitionFields);
StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS
");
- sb =
sb.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName).append(HIVE_ESCAPE_CHARACTER)
+
sb.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName).append(HIVE_ESCAPE_CHARACTER)
.append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER);
- sb = sb.append("( ").append(columns).append(")");
+ sb.append("( ").append(columns).append(")");
if (!config.partitionFields.isEmpty()) {
- sb = sb.append(" PARTITIONED BY (").append(partitionsStr).append(")");
+ sb.append(" PARTITIONED BY (").append(partitionsStr).append(")");
}
- sb = sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'");
- sb = sb.append(" STORED AS INPUTFORMAT
'").append(inputFormatClass).append("'");
- sb = sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("'
LOCATION '").append(config.basePath)
- .append("'");
+ sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'");
+ sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'");
+ sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION
'").append(config.basePath).append("'");
return sb.toString();
}
@@ -433,7 +429,6 @@ public class SchemaUtil {
*
* @return
*/
- @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public static MessageType readSchemaFromLogFile(FileSystem fs, Path path)
throws IOException {
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path),
null);
HoodieAvroDataBlock lastBlock = null;
diff --git a/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 50bb0e5..49692f5 100644
--- a/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -38,7 +38,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
-import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -47,7 +46,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-@SuppressWarnings("ConstantConditions")
@RunWith(Parameterized.class)
public class TestHiveSyncTool {
@@ -64,7 +62,7 @@ public class TestHiveSyncTool {
}
@Before
- public void setUp() throws IOException, InterruptedException,
URISyntaxException {
+ public void setUp() throws IOException, InterruptedException {
TestUtil.setUp();
}
diff --git
a/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
b/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
index d82c33b..fc7675f 100644
--- a/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
+++ b/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
@@ -265,11 +265,11 @@ public class HiveTestService {
? new ChainedTTransportFactory(new TFramedTransport.Factory(), new
TUGIContainingTransport.Factory())
: new TUGIContainingTransport.Factory();
- processor = new TUGIBasedProcessor<IHMSHandler>(handler);
+ processor = new TUGIBasedProcessor<>(handler);
LOG.info("Starting DB backed MetaStore Server with SetUGI enabled");
} else {
transFactory = useFramedTransport ? new TFramedTransport.Factory() :
new TTransportFactory();
- processor = new TSetIpAddressProcessor<IHMSHandler>(handler);
+ processor = new TSetIpAddressProcessor<>(handler);
LOG.info("Starting DB backed MetaStore Server");
}
@@ -278,12 +278,7 @@ public class HiveTestService {
.minWorkerThreads(minWorkerThreads).maxWorkerThreads(maxWorkerThreads);
final TServer tServer = new TThreadPoolServer(args);
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- tServer.serve();
- }
- });
+ executorService.submit(tServer::serve);
return tServer;
} catch (Throwable x) {
throw new IOException(x);
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
index 7d11414..f61028e 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
@@ -33,35 +33,35 @@ import java.util.List;
*/
public class ITTestHoodieDemo extends ITTestBase {
- private static String HDFS_DATA_DIR = "/usr/hive/data/input";
- private static String HDFS_BATCH_PATH1 = HDFS_DATA_DIR + "/batch_1.json";
- private static String HDFS_BATCH_PATH2 = HDFS_DATA_DIR + "/batch_2.json";
- private static String HDFS_PRESTO_INPUT_TABLE_CHECK_PATH = HDFS_DATA_DIR +
"/presto-table-check.commands";
- private static String HDFS_PRESTO_INPUT_BATCH1_PATH = HDFS_DATA_DIR +
"/presto-batch1.commands";
- private static String HDFS_PRESTO_INPUT_BATCH2_PATH = HDFS_DATA_DIR +
"/presto-batch2-after-compaction.commands";
-
- private static String INPUT_BATCH_PATH1 = HOODIE_WS_ROOT +
"/docker/demo/data/batch_1.json";
- private static String PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH =
"/docker/demo/presto-table-check.commands";
- private static String PRESTO_INPUT_BATCH1_RELATIVE_PATH =
"/docker/demo/presto-batch1.commands";
- private static String INPUT_BATCH_PATH2 = HOODIE_WS_ROOT +
"/docker/demo/data/batch_2.json";
- private static String PRESTO_INPUT_BATCH2_RELATIVE_PATH =
"/docker/demo/presto-batch2-after-compaction.commands";
-
- private static String COW_BASE_PATH = "/user/hive/warehouse/stock_ticks_cow";
- private static String MOR_BASE_PATH = "/user/hive/warehouse/stock_ticks_mor";
- private static String COW_TABLE_NAME = "stock_ticks_cow";
- private static String MOR_TABLE_NAME = "stock_ticks_mor";
-
- private static String DEMO_CONTAINER_SCRIPT = HOODIE_WS_ROOT +
"/docker/demo/setup_demo_container.sh";
- private static String MIN_COMMIT_TIME_SCRIPT = HOODIE_WS_ROOT +
"/docker/demo/get_min_commit_time.sh";
- private static String HUDI_CLI_TOOL = HOODIE_WS_ROOT +
"/hudi-cli/hudi-cli.sh";
- private static String COMPACTION_COMMANDS = HOODIE_WS_ROOT +
"/docker/demo/compaction.commands";
- private static String SPARKSQL_BATCH1_COMMANDS = HOODIE_WS_ROOT +
"/docker/demo/sparksql-batch1.commands";
- private static String SPARKSQL_BATCH2_COMMANDS = HOODIE_WS_ROOT +
"/docker/demo/sparksql-batch2.commands";
- private static String SPARKSQL_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT +
"/docker/demo/sparksql-incremental.commands";
- private static String HIVE_TBLCHECK_COMMANDS = HOODIE_WS_ROOT +
"/docker/demo/hive-table-check.commands";
- private static String HIVE_BATCH1_COMMANDS = HOODIE_WS_ROOT +
"/docker/demo/hive-batch1.commands";
- private static String HIVE_BATCH2_COMMANDS = HOODIE_WS_ROOT +
"/docker/demo/hive-batch2-after-compaction.commands";
- private static String HIVE_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT +
"/docker/demo/hive-incremental.commands";
+ private static final String HDFS_DATA_DIR = "/usr/hive/data/input";
+ private static final String HDFS_BATCH_PATH1 = HDFS_DATA_DIR +
"/batch_1.json";
+ private static final String HDFS_BATCH_PATH2 = HDFS_DATA_DIR +
"/batch_2.json";
+ private static final String HDFS_PRESTO_INPUT_TABLE_CHECK_PATH =
HDFS_DATA_DIR + "/presto-table-check.commands";
+ private static final String HDFS_PRESTO_INPUT_BATCH1_PATH = HDFS_DATA_DIR +
"/presto-batch1.commands";
+ private static final String HDFS_PRESTO_INPUT_BATCH2_PATH = HDFS_DATA_DIR +
"/presto-batch2-after-compaction.commands";
+
+ private static final String INPUT_BATCH_PATH1 = HOODIE_WS_ROOT +
"/docker/demo/data/batch_1.json";
+ private static final String PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH =
"/docker/demo/presto-table-check.commands";
+ private static final String PRESTO_INPUT_BATCH1_RELATIVE_PATH =
"/docker/demo/presto-batch1.commands";
+ private static final String INPUT_BATCH_PATH2 = HOODIE_WS_ROOT +
"/docker/demo/data/batch_2.json";
+ private static final String PRESTO_INPUT_BATCH2_RELATIVE_PATH =
"/docker/demo/presto-batch2-after-compaction.commands";
+
+ private static final String COW_BASE_PATH =
"/user/hive/warehouse/stock_ticks_cow";
+ private static final String MOR_BASE_PATH =
"/user/hive/warehouse/stock_ticks_mor";
+ private static final String COW_TABLE_NAME = "stock_ticks_cow";
+ private static final String MOR_TABLE_NAME = "stock_ticks_mor";
+
+ private static final String DEMO_CONTAINER_SCRIPT = HOODIE_WS_ROOT +
"/docker/demo/setup_demo_container.sh";
+ private static final String MIN_COMMIT_TIME_SCRIPT = HOODIE_WS_ROOT +
"/docker/demo/get_min_commit_time.sh";
+ private static final String HUDI_CLI_TOOL = HOODIE_WS_ROOT +
"/hudi-cli/hudi-cli.sh";
+ private static final String COMPACTION_COMMANDS = HOODIE_WS_ROOT +
"/docker/demo/compaction.commands";
+ private static final String SPARKSQL_BATCH1_COMMANDS = HOODIE_WS_ROOT +
"/docker/demo/sparksql-batch1.commands";
+ private static final String SPARKSQL_BATCH2_COMMANDS = HOODIE_WS_ROOT +
"/docker/demo/sparksql-batch2.commands";
+ private static final String SPARKSQL_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT +
"/docker/demo/sparksql-incremental.commands";
+ private static final String HIVE_TBLCHECK_COMMANDS = HOODIE_WS_ROOT +
"/docker/demo/hive-table-check.commands";
+ private static final String HIVE_BATCH1_COMMANDS = HOODIE_WS_ROOT +
"/docker/demo/hive-batch1.commands";
+ private static final String HIVE_BATCH2_COMMANDS = HOODIE_WS_ROOT +
"/docker/demo/hive-batch2-after-compaction.commands";
+ private static final String HIVE_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT +
"/docker/demo/hive-incremental.commands";
private static String HIVE_SYNC_CMD_FMT =
" --enable-hive-sync --hoodie-conf
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 "
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
index 2cfc914..aaddee7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
@@ -260,14 +260,13 @@ public class HDFSParquetImporter implements Serializable {
public int parallelism = 1;
@Parameter(names = {"--schema-file", "-sf"}, description = "path for Avro
schema file", required = true)
public String schemaFile = null;
- @Parameter(names = {"--format", "-f"}, description = "Format for the input
data.", required = false,
- validateValueWith = FormatValidator.class)
+ @Parameter(names = {"--format", "-f"}, description = "Format for the input
data.", validateValueWith = FormatValidator.class)
public String format = null;
- @Parameter(names = {"--spark-master", "-ms"}, description = "Spark
master", required = false)
+ @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
public String sparkMaster = null;
@Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory
to use", required = true)
public String sparkMemory = null;
- @Parameter(names = {"--retry", "-rt"}, description = "number of retries",
required = false)
+ @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
public int retry = 0;
@Parameter(names = {"--props"}, description = "path to properties file on
localfs or dfs, with configurations for "
+ "hoodie client for importing")
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
index 7fa0da5..e9a5f80 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
@@ -114,7 +114,7 @@ public class TimelineServerPerf implements Serializable {
d2.close();
System.out.println("\n\n\nDumping all File Slices");
- selected.stream().forEach(p -> fsView.getAllFileSlices(p).forEach(s ->
System.out.println("\tMyFileSlice=" + s)));
+ selected.forEach(p -> fsView.getAllFileSlices(p).forEach(s ->
System.out.println("\tMyFileSlice=" + s)));
// Waiting for curl queries
if (!useExternalTimelineServer && cfg.waitForManualQueries) {
@@ -131,17 +131,16 @@ public class TimelineServerPerf implements Serializable {
public List<PerfStats> runLookups(JavaSparkContext jsc, List<String>
partitionPaths, SyncableFileSystemView fsView,
int numIterations, int concurrency) {
- List<PerfStats> perfStats = jsc.parallelize(partitionPaths,
cfg.numExecutors).flatMap(p -> {
+ return jsc.parallelize(partitionPaths, cfg.numExecutors).flatMap(p -> {
ScheduledThreadPoolExecutor executor = new
ScheduledThreadPoolExecutor(100);
final List<PerfStats> result = new ArrayList<>();
final List<ScheduledFuture<PerfStats>> futures = new ArrayList<>();
List<FileSlice> slices =
fsView.getLatestFileSlices(p).collect(Collectors.toList());
String fileId = slices.isEmpty() ? "dummyId"
: slices.get(new
Random(Double.doubleToLongBits(Math.random())).nextInt(slices.size())).getFileId();
- IntStream.range(0, concurrency).forEach(i -> {
- futures.add(executor.schedule(() -> runOneRound(fsView, p, fileId, i,
numIterations), 0, TimeUnit.NANOSECONDS));
- });
- futures.stream().forEach(x -> {
+ IntStream.range(0, concurrency).forEach(i ->
futures.add(executor.schedule(() -> runOneRound(fsView, p, fileId,
+ i, numIterations), 0, TimeUnit.NANOSECONDS)));
+ futures.forEach(x -> {
try {
result.add(x.get());
} catch (InterruptedException | ExecutionException e) {
@@ -149,12 +148,9 @@ public class TimelineServerPerf implements Serializable {
}
});
System.out.println("SLICES are=");
- slices.stream().forEach(s -> {
- System.out.println("\t\tFileSlice=" + s);
- });
+ slices.forEach(s -> System.out.println("\t\tFileSlice=" + s));
return result.iterator();
}).collect();
- return perfStats;
}
private static PerfStats runOneRound(SyncableFileSystemView fsView, String
partition, String fileId, int id,
@@ -194,7 +190,7 @@ public class TimelineServerPerf implements Serializable {
}
public void dump(List<PerfStats> stats) {
- stats.stream().forEach(x -> {
+ stats.forEach(x -> {
String row = String.format("%s,%d,%d,%d,%f,%f,%f,%f\n", x.partition,
x.id, x.minTime, x.maxTime, x.meanTime,
x.medianTime, x.p75, x.p95);
System.out.println(row);
@@ -260,7 +256,7 @@ public class TimelineServerPerf implements Serializable {
@Parameter(names = {"--num-iterations", "-i"}, description = "Number of
iterations for each partitions")
public Integer numIterations = 10;
- @Parameter(names = {"--spark-master", "-ms"}, description = "Spark
master", required = false)
+ @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
public String sparkMaster = "local[2]";
@Parameter(names = {"--server-port", "-p"}, description = " Server Port")
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
index 54ea0f3..9787bab 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
@@ -38,7 +38,7 @@ public class IncrSourceHelper {
private static String getStrictlyLowerTimestamp(String timestamp) {
long ts = Long.parseLong(timestamp);
Preconditions.checkArgument(ts > 0, "Timestamp must be positive");
- Long lower = ts - 1;
+ long lower = ts - 1;
return "" + lower;
}
@@ -73,7 +73,7 @@ public class IncrSourceHelper {
Option<HoodieInstant> nthInstant =
Option.fromJavaOptional(activeCommitTimeline
.findInstantsAfter(beginInstantTime,
numInstantsPerFetch).getInstants().reduce((x, y) -> y));
- return Pair.of(beginInstantTime, nthInstant.map(instant ->
instant.getTimestamp()).orElse(beginInstantTime));
+ return Pair.of(beginInstantTime,
nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime));
}
/**
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index a92a441..4ad8855 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -94,8 +94,7 @@ public class KafkaOffsetGen {
// Create initial offset ranges for each 'to' partition, with from = to
offsets.
OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()];
- toOffsetMap.entrySet().stream().map(e -> {
- TopicPartition tp = e.getKey();
+ toOffsetMap.keySet().stream().map(tp -> {
long fromOffset = fromOffsetMap.getOrDefault(tp, 0L);
return OffsetRange.create(tp, fromOffset, fromOffset);
}).sorted(byPartition).collect(Collectors.toList()).toArray(ranges);
@@ -208,9 +207,7 @@ public class KafkaOffsetGen {
maxEventsToReadFromKafka = (maxEventsToReadFromKafka == Long.MAX_VALUE ||
maxEventsToReadFromKafka == Integer.MAX_VALUE)
? Config.maxEventsFromKafkaSource : maxEventsToReadFromKafka;
long numEvents = sourceLimit == Long.MAX_VALUE ? maxEventsToReadFromKafka
: sourceLimit;
- OffsetRange[] offsetRanges =
CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
-
- return offsetRanges;
+ return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets,
numEvents);
}
// check up checkpoint offsets is valid or not, if true, return checkpoint
offsets,