This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b791473 Introduce HoodieReadHandle abstraction into index
b791473 is described below
commit b791473a6d663cd4ac2ce535aaa62bcd89fea3af
Author: Vinoth Chandar <[email protected]>
AuthorDate: Tue May 21 15:37:38 2019 -0700
Introduce HoodieReadHandle abstraction into index
- Generalized BloomIndex to work with file ids instead of paths
- Abstracted away Bloom filter checking into HoodieLookupHandle
- Abstracted away range information retrieval into HoodieRangeInfoHandle
---
.../java/com/uber/hoodie/HoodieReadClient.java | 18 ++-
.../hoodie/func/CopyOnWriteLazyInsertIterable.java | 5 +-
.../java/com/uber/hoodie/index/HoodieIndex.java | 9 +-
.../com/uber/hoodie/index/InMemoryHashIndex.java | 3 +-
.../hoodie/index/bloom/BloomIndexFileInfo.java | 22 +--
.../uber/hoodie/index/bloom/HoodieBloomIndex.java | 95 ++++++-------
.../index/bloom/HoodieBloomIndexCheckFunction.java | 138 ++++--------------
.../hoodie/index/bloom/HoodieGlobalBloomIndex.java | 5 +-
.../IntervalTreeBasedGlobalIndexFileFilter.java | 4 +-
.../bloom/IntervalTreeBasedIndexFileFilter.java | 4 +-
.../bloom/ListBasedGlobalIndexFileFilter.java | 2 +-
.../index/bloom/ListBasedIndexFileFilter.java | 2 +-
.../com/uber/hoodie/index/hbase/HBaseIndex.java | 7 +-
.../com/uber/hoodie/io/HoodieAppendHandle.java | 16 +--
.../com/uber/hoodie/io/HoodieCreateHandle.java | 4 +-
.../java/com/uber/hoodie/io/HoodieIOHandle.java | 152 +-------------------
.../com/uber/hoodie/io/HoodieKeyLookupHandle.java | 158 +++++++++++++++++++++
.../java/com/uber/hoodie/io/HoodieMergeHandle.java | 76 ++++++++--
.../com/uber/hoodie/io/HoodieRangeInfoHandle.java | 43 ++++++
.../java/com/uber/hoodie/io/HoodieReadHandle.java | 59 ++++++++
...{HoodieIOHandle.java => HoodieWriteHandle.java} | 47 +++---
.../uber/hoodie/table/HoodieCopyOnWriteTable.java | 2 +-
.../java/com/uber/hoodie/table/WorkloadStat.java | 2 +-
.../java/com/uber/hoodie/TestHoodieClientBase.java | 2 +-
.../java/com/uber/hoodie/index/TestHbaseIndex.java | 2 +-
.../hoodie/index/bloom/TestHoodieBloomIndex.java | 27 ++--
.../index/bloom/TestHoodieGlobalBloomIndex.java | 41 +++---
.../hoodie/common/model/HoodieRecordLocation.java | 16 +--
.../uber/hoodie/common/model/HoodieTestUtils.java | 4 +-
.../util/collection/TestExternalSpillableMap.java | 2 +-
30 files changed, 537 insertions(+), 430 deletions(-)
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java
b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java
index e5e7239..a1e7ab7 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java
@@ -20,6 +20,7 @@ package com.uber.hoodie;
import com.google.common.base.Optional;
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
+import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
@@ -119,15 +120,27 @@ public class HoodieReadClient<T extends
HoodieRecordPayload> extends AbstractHoo
}
}
+ private Optional<String> convertToDataFilePath(Optional<Pair<String,
String>> partitionPathFileIDPair) {
+ if (partitionPathFileIDPair.isPresent()) {
+ HoodieDataFile dataFile = hoodieTable.getROFileSystemView()
+ .getLatestDataFile(partitionPathFileIDPair.get().getLeft(),
partitionPathFileIDPair.get().getRight()).get();
+ return Optional.of(dataFile.getPath());
+ } else {
+ return Optional.absent();
+ }
+ }
+
/**
* Given a bunch of hoodie keys, fetches all the individual records out as a
data frame
*
* @return a dataframe
*/
- public Dataset<Row> read(JavaRDD<HoodieKey> hoodieKeys, int parallelism)
throws Exception {
+ public Dataset<Row> readROView(JavaRDD<HoodieKey> hoodieKeys, int
parallelism) {
assertSqlContext();
- JavaPairRDD<HoodieKey, Optional<String>> keyToFileRDD = index
+ JavaPairRDD<HoodieKey, Optional<Pair<String, String>>> lookupResultRDD =
index
.fetchRecordLocation(hoodieKeys, jsc, hoodieTable);
+ JavaPairRDD<HoodieKey, Optional<String>> keyToFileRDD = lookupResultRDD
+ .mapToPair(r -> new Tuple2<>(r._1, convertToDataFilePath(r._2)));
List<String> paths = keyToFileRDD.filter(keyFileTuple ->
keyFileTuple._2().isPresent())
.map(keyFileTuple -> keyFileTuple._2().get()).collect();
@@ -144,7 +157,6 @@ public class HoodieReadClient<T extends
HoodieRecordPayload> extends AbstractHoo
// Now, we need to further filter out, for only rows that match the
supplied hoodie keys
JavaRDD<Row> rowRDD = keyRowRDD.join(keyToFileRDD, parallelism).map(tuple
-> tuple._2()._1());
-
return sqlContextOpt.get().createDataFrame(rowRDD, schema);
}
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java
b/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java
index f2e9834..f885ce2 100644
---
a/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java
+++
b/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java
@@ -26,7 +26,7 @@ import
com.uber.hoodie.common.util.queue.BoundedInMemoryQueueConsumer;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.io.HoodieCreateHandle;
-import com.uber.hoodie.io.HoodieIOHandle;
+import com.uber.hoodie.io.HoodieWriteHandle;
import com.uber.hoodie.table.HoodieTable;
import java.util.ArrayList;
import java.util.Iterator;
@@ -131,12 +131,11 @@ public class CopyOnWriteLazyInsertIterable<T extends
HoodieRecordPayload> extend
BoundedInMemoryQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>,
List<WriteStatus>> {
protected final List<WriteStatus> statuses = new ArrayList<>();
- protected HoodieIOHandle handle;
+ protected HoodieWriteHandle handle;
@Override
protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord>
payload) {
final HoodieRecord insertPayload = payload.record;
-
// lazily initialize the handle, for the first time
if (handle == null) {
handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable,
insertPayload.getPartitionPath(),
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java
b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java
index e3f3578..7f4eb63 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java
@@ -23,6 +23,7 @@ import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
+import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieIndexException;
import com.uber.hoodie.index.bloom.HoodieBloomIndex;
@@ -63,12 +64,10 @@ public abstract class HoodieIndex<T extends
HoodieRecordPayload> implements Seri
}
/**
- * Checks if the given [Keys] exists in the hoodie table and returns [Key,
Optional[FullFilePath]]
- * If the optional FullFilePath value is not present, then the key is not
found. If the
- * FullFilePath value is present, it is the path component (without scheme)
of the URI underlying
- * file
+ * Checks if the given [Keys] exists in the hoodie table and returns [Key,
Optional[partitionPath, fileID]]
+ * If the optional is empty, then the key is not found.
*/
- public abstract JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(
+ public abstract JavaPairRDD<HoodieKey, Optional<Pair<String, String>>>
fetchRecordLocation(
JavaRDD<HoodieKey> hoodieKeys, final JavaSparkContext jsc,
HoodieTable<T> hoodieTable);
/**
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java
b/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java
index 672fa84..61da5ab 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java
@@ -24,6 +24,7 @@ import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
+import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.table.HoodieTable;
import java.util.ArrayList;
@@ -55,7 +56,7 @@ public class InMemoryHashIndex<T extends HoodieRecordPayload>
extends HoodieInde
}
@Override
- public JavaPairRDD<HoodieKey, Optional<String>>
fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
+ public JavaPairRDD<HoodieKey, Optional<Pair<String, String>>>
fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
throw new UnsupportedOperationException("InMemory index does not implement
check exist yet");
}
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BloomIndexFileInfo.java
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BloomIndexFileInfo.java
index 1114262..2a56d8a 100644
---
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BloomIndexFileInfo.java
+++
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BloomIndexFileInfo.java
@@ -22,30 +22,30 @@ import com.google.common.base.Objects;
import java.io.Serializable;
/**
- * Metadata about a given file, useful for index lookup
+ * Metadata about a given file group, useful for index lookup
*/
public class BloomIndexFileInfo implements Serializable {
- private final String fileName;
+ private final String fileId;
private final String minRecordKey;
private final String maxRecordKey;
- public BloomIndexFileInfo(String fileName, String minRecordKey, String
maxRecordKey) {
- this.fileName = fileName;
+ public BloomIndexFileInfo(String fileId, String minRecordKey, String
maxRecordKey) {
+ this.fileId = fileId;
this.minRecordKey = minRecordKey;
this.maxRecordKey = maxRecordKey;
}
- public BloomIndexFileInfo(String fileName) {
- this.fileName = fileName;
+ public BloomIndexFileInfo(String fileId) {
+ this.fileId = fileId;
this.minRecordKey = null;
this.maxRecordKey = null;
}
- public String getFileName() {
- return fileName;
+ public String getFileId() {
+ return fileId;
}
public String getMinRecordKey() {
@@ -77,19 +77,19 @@ public class BloomIndexFileInfo implements Serializable {
}
BloomIndexFileInfo that = (BloomIndexFileInfo) o;
- return Objects.equal(that.fileName, fileName) &&
Objects.equal(that.minRecordKey, minRecordKey)
+ return Objects.equal(that.fileId, fileId) &&
Objects.equal(that.minRecordKey, minRecordKey)
&& Objects.equal(that.maxRecordKey, maxRecordKey);
}
@Override
public int hashCode() {
- return Objects.hashCode(fileName, minRecordKey, maxRecordKey);
+ return Objects.hashCode(fileId, minRecordKey, maxRecordKey);
}
public String toString() {
final StringBuilder sb = new StringBuilder("BloomIndexFileInfo {");
- sb.append(" fileName=").append(fileName);
+ sb.append(" fileId=").append(fileId);
sb.append(" minRecordKey=").append(minRecordKey);
sb.append(" maxRecordKey=").append(maxRecordKey);
sb.append('}');
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java
index 0b93b9c..195738a 100644
---
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java
+++
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java
@@ -25,26 +25,22 @@ import static java.util.stream.Collectors.toList;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.uber.hoodie.WriteStatus;
-import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
-import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
-import com.uber.hoodie.common.util.FSUtils;
-import com.uber.hoodie.common.util.ParquetUtils;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.MetadataNotFoundException;
import com.uber.hoodie.index.HoodieIndex;
+import com.uber.hoodie.io.HoodieRangeInfoHandle;
import com.uber.hoodie.table.HoodieTable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
@@ -85,7 +81,8 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload>
extends HoodieIndex
.mapToPair(record -> new Tuple2<>(record.getPartitionPath(),
record.getRecordKey()));
// Lookup indexes for all the partition/recordkey pair
- JavaPairRDD<HoodieKey, String> keyFilenamePairRDD =
lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
+ JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD =
lookupIndex(partitionRecordKeyPairRDD, jsc,
+ hoodieTable);
// Cache the result, for subsequent stages.
if (config.getBloomIndexUseCaching()) {
@@ -109,27 +106,33 @@ public class HoodieBloomIndex<T extends
HoodieRecordPayload> extends HoodieIndex
return taggedRecordRDD;
}
- public JavaPairRDD<HoodieKey, Optional<String>>
fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
+ /**
+ * Returns an RDD mapping each HoodieKey with a partitionPath/fileID which
contains it. Optional.Empty if the key is
+ * not found.
+ *
+ * @param hoodieKeys keys to lookup
+ * @param jsc spark context
+ * @param hoodieTable hoodie table object
+ */
+ @Override
+ public JavaPairRDD<HoodieKey, Optional<Pair<String, String>>>
fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
JavaPairRDD<String, String> partitionRecordKeyPairRDD = hoodieKeys
.mapToPair(key -> new Tuple2<>(key.getPartitionPath(),
key.getRecordKey()));
// Lookup indexes for all the partition/recordkey pair
- JavaPairRDD<HoodieKey, String> keyFilenamePairRDD =
lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
+ JavaPairRDD<HoodieKey, HoodieRecordLocation> recordKeyLocationRDD =
lookupIndex(partitionRecordKeyPairRDD, jsc,
+ hoodieTable);
JavaPairRDD<HoodieKey, String> keyHoodieKeyPairRDD =
hoodieKeys.mapToPair(key -> new Tuple2<>(key, null));
- return
keyHoodieKeyPairRDD.leftOuterJoin(keyFilenamePairRDD).mapToPair(keyPathTuple ->
{
- Optional<String> recordLocationPath;
- if (keyPathTuple._2._2.isPresent()) {
- String fileName = keyPathTuple._2._2.get();
- String partitionPath = keyPathTuple._1.getPartitionPath();
- recordLocationPath = Optional
- .of(new Path(new Path(hoodieTable.getMetaClient().getBasePath(),
partitionPath), fileName)
- .toUri().getPath());
+ return
keyHoodieKeyPairRDD.leftOuterJoin(recordKeyLocationRDD).mapToPair(keyLoc -> {
+ Optional<Pair<String, String>> partitionPathFileidPair;
+ if (keyLoc._2._2.isPresent()) {
+ partitionPathFileidPair =
Optional.of(Pair.of(keyLoc._1().getPartitionPath(),
keyLoc._2._2.get().getFileId()));
} else {
- recordLocationPath = Optional.absent();
+ partitionPathFileidPair = Optional.absent();
}
- return new Tuple2<>(keyPathTuple._1, recordLocationPath);
+ return new Tuple2<>(keyLoc._1, partitionPathFileidPair);
});
}
@@ -137,9 +140,9 @@ public class HoodieBloomIndex<T extends
HoodieRecordPayload> extends HoodieIndex
* Lookup the location for each record key and return the
pair<record_key,location> for all record keys already
* present and drop the record keys if not present
*/
- private JavaPairRDD<HoodieKey, String> lookupIndex(
- JavaPairRDD<String, String> partitionRecordKeyPairRDD, final
JavaSparkContext
- jsc, final HoodieTable hoodieTable) {
+ private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(
+ JavaPairRDD<String, String> partitionRecordKeyPairRDD, final
JavaSparkContext jsc,
+ final HoodieTable hoodieTable) {
// Obtain records per partition, in the incoming records
Map<String, Long> recordsPerPartition =
partitionRecordKeyPairRDD.countByKey();
List<String> affectedPartitionPathList = new
ArrayList<>(recordsPerPartition.keySet());
@@ -157,7 +160,7 @@ public class HoodieBloomIndex<T extends
HoodieRecordPayload> extends HoodieIndex
int safeParallelism = computeSafeParallelism(recordsPerPartition,
comparisonsPerFileGroup);
int joinParallelism =
determineParallelism(partitionRecordKeyPairRDD.partitions().size(),
safeParallelism);
return findMatchingFilesForRecordKeys(partitionToFileInfo,
partitionRecordKeyPairRDD, joinParallelism,
- hoodieTable.getMetaClient(), comparisonsPerFileGroup);
+ hoodieTable, comparisonsPerFileGroup);
}
/**
@@ -178,7 +181,7 @@ public class HoodieBloomIndex<T extends
HoodieRecordPayload> extends HoodieIndex
partitionToFileInfo.entrySet().stream().forEach(e -> {
for (BloomIndexFileInfo fileInfo : e.getValue()) {
//each file needs to be compared against all the records coming into
the partition
- fileToComparisons.put(fileInfo.getFileName(),
recordsPerPartition.get(e.getKey()));
+ fileToComparisons.put(fileInfo.getFileId(),
recordsPerPartition.get(e.getKey()));
}
});
}
@@ -227,35 +230,35 @@ public class HoodieBloomIndex<T extends
HoodieRecordPayload> extends HoodieIndex
final HoodieTable hoodieTable) {
// Obtain the latest data files from all the partitions.
- List<Tuple2<String, HoodieDataFile>> dataFilesList = jsc
- .parallelize(partitions, Math.max(partitions.size(),
1)).flatMapToPair(partitionPath -> {
+ List<Pair<String, String>> partitionPathFileIDList = jsc
+ .parallelize(partitions, Math.max(partitions.size(), 1))
+ .flatMap(partitionPath -> {
java.util.Optional<HoodieInstant> latestCommitTime =
hoodieTable.getMetaClient().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
- List<Tuple2<String, HoodieDataFile>> filteredFiles = new
ArrayList<>();
+ List<Pair<String, String>> filteredFiles = new ArrayList<>();
if (latestCommitTime.isPresent()) {
filteredFiles = hoodieTable.getROFileSystemView()
.getLatestDataFilesBeforeOrOn(partitionPath,
latestCommitTime.get().getTimestamp())
- .map(f -> new Tuple2<>(partitionPath, f)).collect(toList());
+ .map(f -> Pair.of(partitionPath,
f.getFileId())).collect(toList());
}
return filteredFiles.iterator();
}).collect();
if (config.getBloomIndexPruneByRanges()) {
// also obtain file ranges, if range pruning is enabled
- return jsc.parallelize(dataFilesList, Math.max(dataFilesList.size(),
1)).mapToPair(ft -> {
+ return jsc.parallelize(partitionPathFileIDList,
Math.max(partitionPathFileIDList.size(), 1)).mapToPair(pf -> {
try {
- String[] minMaxKeys = ParquetUtils
- .readMinMaxRecordKeys(hoodieTable.getHadoopConf(), new
Path(ft._2().getPath()));
- return new Tuple2<>(ft._1(),
- new BloomIndexFileInfo(ft._2().getFileName(), minMaxKeys[0],
minMaxKeys[1]));
+ HoodieRangeInfoHandle<T> rangeInfoHandle = new
HoodieRangeInfoHandle<T>(config, hoodieTable, pf);
+ String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+ return new Tuple2<>(pf.getKey(), new
BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
} catch (MetadataNotFoundException me) {
- logger.warn("Unable to find range metadata in file :" + ft._2());
- return new Tuple2<>(ft._1(), new
BloomIndexFileInfo(ft._2().getFileName()));
+ logger.warn("Unable to find range metadata in file :" + pf);
+ return new Tuple2<>(pf.getKey(), new
BloomIndexFileInfo(pf.getValue()));
}
}).collect();
} else {
- return dataFilesList.stream()
- .map(ft -> new Tuple2<>(ft._1(), new
BloomIndexFileInfo(ft._2().getFileName())))
+ return partitionPathFileIDList.stream()
+ .map(pf -> new Tuple2<>(pf.getKey(), new
BloomIndexFileInfo(pf.getValue())))
.collect(toList());
}
}
@@ -324,9 +327,9 @@ public class HoodieBloomIndex<T extends
HoodieRecordPayload> extends HoodieIndex
* parallelism for tagging location
*/
@VisibleForTesting
- JavaPairRDD<HoodieKey, String> findMatchingFilesForRecordKeys(
+ JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
- JavaPairRDD<String, String> partitionRecordKeyPairRDD, int
shuffleParallelism, HoodieTableMetaClient metaClient,
+ JavaPairRDD<String, String> partitionRecordKeyPairRDD, int
shuffleParallelism, HoodieTable hoodieTable,
Map<String, Long> fileGroupToComparisons) {
JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo,
partitionRecordKeyPairRDD);
@@ -347,17 +350,18 @@ public class HoodieBloomIndex<T extends
HoodieRecordPayload> extends HoodieIndex
}
return fileComparisonsRDD
- .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(metaClient,
config.getBasePath()), true)
+ .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable,
config), true)
.flatMap(List::iterator)
.filter(lr -> lr.getMatchingRecordKeys().size() > 0)
.flatMapToPair(lookupResult ->
lookupResult.getMatchingRecordKeys().stream()
.map(recordKey -> new Tuple2<>(new HoodieKey(recordKey,
lookupResult.getPartitionPath()),
- lookupResult.getFileName()))
+ new HoodieRecordLocation(lookupResult.getBaseInstantTime(),
lookupResult.getFileId())))
.collect(Collectors.toList())
.iterator());
}
- HoodieRecord<T> getTaggedRecord(HoodieRecord<T> inputRecord,
org.apache.spark.api.java.Optional<String> location) {
+ HoodieRecord<T> getTaggedRecord(HoodieRecord<T> inputRecord,
+ org.apache.spark.api.java.Optional<HoodieRecordLocation> location) {
HoodieRecord<T> record = inputRecord;
if (location.isPresent()) {
// When you have a record in multiple files in the same partition, then
rowKeyRecordPairRDD
@@ -366,11 +370,7 @@ public class HoodieBloomIndex<T extends
HoodieRecordPayload> extends HoodieIndex
// currentLocation 2 times and it will fail the second time. So creating
a new in memory
// copy of the hoodie record.
record = new HoodieRecord<>(inputRecord);
- String filename = location.get();
- if (filename != null && !filename.isEmpty()) {
- record.setCurrentLocation(new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
- FSUtils.getFileId(filename)));
- }
+ record.setCurrentLocation(location.get());
}
return record;
}
@@ -379,10 +379,9 @@ public class HoodieBloomIndex<T extends
HoodieRecordPayload> extends HoodieIndex
* Tag the <rowKey, filename> back to the original HoodieRecord RDD.
*/
protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
- JavaPairRDD<HoodieKey, String> keyFilenamePairRDD,
JavaRDD<HoodieRecord<T>> recordRDD) {
+ JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD,
JavaRDD<HoodieRecord<T>> recordRDD) {
JavaPairRDD<HoodieKey, HoodieRecord<T>> keyRecordPairRDD = recordRDD
.mapToPair(record -> new Tuple2<>(record.getKey(), record));
-
// Here as the recordRDD might have more data than rowKeyRDD (some
rowKeys' fileId is null),
// so we do left outer join.
return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values().map(v1
-> getTaggedRecord(v1._1, v1._2));
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
index 381e745..3dd3df5 100644
---
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
+++
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
@@ -18,23 +18,18 @@
package com.uber.hoodie.index.bloom;
-import com.uber.hoodie.common.BloomFilter;
import com.uber.hoodie.common.model.HoodieKey;
-import com.uber.hoodie.common.table.HoodieTableMetaClient;
-import com.uber.hoodie.common.util.HoodieTimer;
-import com.uber.hoodie.common.util.ParquetUtils;
+import com.uber.hoodie.common.util.collection.Pair;
+import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIndexException;
import com.uber.hoodie.func.LazyIterableIterator;
+import com.uber.hoodie.io.HoodieKeyLookupHandle;
+import com.uber.hoodie.io.HoodieKeyLookupHandle.KeyLookupResult;
+import com.uber.hoodie.table.HoodieTable;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
@@ -43,150 +38,69 @@ import scala.Tuple2;
* actual files
*/
public class HoodieBloomIndexCheckFunction implements
- Function2<Integer, Iterator<Tuple2<String, HoodieKey>>,
- Iterator<List<KeyLookupResult>>> {
+ Function2<Integer, Iterator<Tuple2<String, HoodieKey>>,
Iterator<List<KeyLookupResult>>> {
- private static Logger logger =
LogManager.getLogger(HoodieBloomIndexCheckFunction.class);
+ private final HoodieTable hoodieTable;
- private final String basePath;
+ private final HoodieWriteConfig config;
- private final HoodieTableMetaClient metaClient;
-
- public HoodieBloomIndexCheckFunction(HoodieTableMetaClient metaClient,
String basePath) {
- this.metaClient = metaClient;
- this.basePath = basePath;
- }
-
- /**
- * Given a list of row keys and one file, return only row keys existing in
that file.
- */
- public static List<String> checkCandidatesAgainstFile(Configuration
configuration,
- List<String> candidateRecordKeys, Path filePath) throws
HoodieIndexException {
- List<String> foundRecordKeys = new ArrayList<>();
- try {
- // Load all rowKeys from the file, to double-confirm
- if (!candidateRecordKeys.isEmpty()) {
- HoodieTimer timer = new HoodieTimer().startTimer();
- Set<String> fileRowKeys =
ParquetUtils.filterParquetRowKeys(configuration, filePath,
- new HashSet<>(candidateRecordKeys));
- foundRecordKeys.addAll(fileRowKeys);
- logger.info(String.format("Checked keys against file %s, in %d ms.
#candidates (%d) #found (%d)", filePath,
- timer.endTimer(), candidateRecordKeys.size(),
foundRecordKeys.size()));
- if (logger.isDebugEnabled()) {
- logger.debug("Keys matching for file " + filePath + " => " +
foundRecordKeys);
- }
- }
- } catch (Exception e) {
- throw new HoodieIndexException("Error checking candidate keys against
file.", e);
- }
- return foundRecordKeys;
+ public HoodieBloomIndexCheckFunction(HoodieTable hoodieTable,
HoodieWriteConfig config) {
+ this.hoodieTable = hoodieTable;
+ this.config = config;
}
@Override
public Iterator<List<KeyLookupResult>> call(Integer partition,
- Iterator<Tuple2<String, HoodieKey>> fileParitionRecordKeyTripletItr)
- throws Exception {
+ Iterator<Tuple2<String, HoodieKey>> fileParitionRecordKeyTripletItr) {
return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr);
}
- class LazyKeyCheckIterator extends
- LazyIterableIterator<Tuple2<String, HoodieKey>, List<KeyLookupResult>> {
+ class LazyKeyCheckIterator extends LazyIterableIterator<Tuple2<String,
HoodieKey>, List<KeyLookupResult>> {
- private List<String> candidateRecordKeys;
-
- private BloomFilter bloomFilter;
-
- private String currentFile;
-
- private String currentPartitionPath;
-
- private long totalKeysChecked;
+ private HoodieKeyLookupHandle keyLookupHandle;
LazyKeyCheckIterator(
Iterator<Tuple2<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
super(filePartitionRecordKeyTripletItr);
- currentFile = null;
- candidateRecordKeys = new ArrayList<>();
- bloomFilter = null;
- currentPartitionPath = null;
- totalKeysChecked = 0;
}
@Override
protected void start() {
}
- private void initState(String fileName, String partitionPath) throws
HoodieIndexException {
- try {
- Path filePath = new Path(basePath + "/" + partitionPath + "/" +
fileName);
- HoodieTimer timer = new HoodieTimer().startTimer();
- bloomFilter =
ParquetUtils.readBloomFilterFromParquetMetadata(metaClient.getHadoopConf(),
filePath);
- logger.info(String.format("Read bloom filter from %s/%s in %d ms",
partitionPath, fileName, timer.endTimer()));
- candidateRecordKeys = new ArrayList<>();
- currentFile = fileName;
- currentPartitionPath = partitionPath;
- totalKeysChecked = 0;
- } catch (Exception e) {
- throw new HoodieIndexException("Error checking candidate keys against
file.", e);
- }
- }
-
- // check record key against bloom filter of current file & add to possible
keys if needed
- private void checkAndAddCandidates(String recordKey) {
- if (bloomFilter.mightContain(recordKey)) {
- if (logger.isDebugEnabled()) {
- logger.debug("Record key " + recordKey + " matches bloom filter in
file " + currentPartitionPath
- + "/" + currentFile);
- }
- candidateRecordKeys.add(recordKey);
- }
- totalKeysChecked++;
- }
-
- private List<String> checkAgainstCurrentFile() {
- Path filePath = new Path(basePath + "/" + currentPartitionPath + "/" +
currentFile);
- if (logger.isDebugEnabled()) {
- logger.debug("#The candidate row keys for " + filePath + " => " +
candidateRecordKeys);
- }
- List<String> matchingKeys =
checkCandidatesAgainstFile(metaClient.getHadoopConf(), candidateRecordKeys,
filePath);
- logger.info(String.format("Total records (%d), bloom filter candidates
(%d)/fp(%d), actual matches (%d)",
- totalKeysChecked, candidateRecordKeys.size(),
candidateRecordKeys.size() - matchingKeys.size(),
- matchingKeys.size()));
- return matchingKeys;
- }
-
@Override
- protected List<KeyLookupResult> computeNext() {
+ protected List<HoodieKeyLookupHandle.KeyLookupResult> computeNext() {
- List<KeyLookupResult> ret = new ArrayList<>();
+ List<HoodieKeyLookupHandle.KeyLookupResult> ret = new ArrayList<>();
try {
// process one file in each go.
while (inputItr.hasNext()) {
Tuple2<String, HoodieKey> currentTuple = inputItr.next();
- String fileName = currentTuple._1;
+ String fileId = currentTuple._1;
String partitionPath = currentTuple._2.getPartitionPath();
String recordKey = currentTuple._2.getRecordKey();
+ Pair<String, String> partitionPathFilePair = Pair.of(partitionPath,
fileId);
// lazily init state
- if (currentFile == null) {
- initState(fileName, partitionPath);
+ if (keyLookupHandle == null) {
+ keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable,
partitionPathFilePair);
}
// if continue on current file
- if (fileName.equals(currentFile)) {
- checkAndAddCandidates(recordKey);
+ if
(keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) {
+ keyLookupHandle.addKey(recordKey);
} else {
// do the actual checking of file & break out
- ret.add(new KeyLookupResult(currentFile, currentPartitionPath,
checkAgainstCurrentFile()));
- initState(fileName, partitionPath);
- checkAndAddCandidates(recordKey);
+ ret.add(keyLookupHandle.getLookupResult());
+ keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable,
partitionPathFilePair);
+ keyLookupHandle.addKey(recordKey);
break;
}
}
// handle case, where we ran out of input, close pending work, update
return val
if (!inputItr.hasNext()) {
- ret.add(new KeyLookupResult(currentFile, currentPartitionPath,
checkAgainstCurrentFile()));
+ ret.add(keyLookupHandle.getLookupResult());
}
} catch (Throwable e) {
if (e instanceof HoodieException) {
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
index 2331729..4ec51d9 100644
---
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
+++
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
@@ -21,6 +21,7 @@ package com.uber.hoodie.index.bloom;
import com.google.common.annotations.VisibleForTesting;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
+import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils;
@@ -83,7 +84,7 @@ public class HoodieGlobalBloomIndex<T extends
HoodieRecordPayload> extends Hoodi
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
Map<String, String> indexToPartitionMap = new HashMap<>();
for (Entry<String, List<BloomIndexFileInfo>> entry :
partitionToFileIndexInfo.entrySet()) {
- entry.getValue().forEach(indexFile ->
indexToPartitionMap.put(indexFile.getFileName(), entry.getKey()));
+ entry.getValue().forEach(indexFile ->
indexToPartitionMap.put(indexFile.getFileId(), entry.getKey()));
}
IndexFileFilter indexFileFilter = config.getBloomIndexPruneByRanges()
@@ -106,7 +107,7 @@ public class HoodieGlobalBloomIndex<T extends
HoodieRecordPayload> extends Hoodi
*/
@Override
protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
- JavaPairRDD<HoodieKey, String> keyFilenamePairRDD,
JavaRDD<HoodieRecord<T>> recordRDD) {
+ JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD,
JavaRDD<HoodieRecord<T>> recordRDD) {
JavaPairRDD<String, HoodieRecord<T>> rowKeyRecordPairRDD = recordRDD
.mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java
index c03de08..c675ffc 100644
---
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java
+++
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java
@@ -50,9 +50,9 @@ class IntervalTreeBasedGlobalIndexFileFilter implements
IndexFileFilter {
allIndexFiles.forEach(indexFile -> {
if (indexFile.hasKeyRanges()) {
indexLookUpTree.insert(new KeyRangeNode(indexFile.getMinRecordKey(),
- indexFile.getMaxRecordKey(), indexFile.getFileName()));
+ indexFile.getMaxRecordKey(), indexFile.getFileId()));
} else {
- filesWithNoRanges.add(indexFile.getFileName());
+ filesWithNoRanges.add(indexFile.getFileId());
}
});
}
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java
index 5e34d54..412e778 100644
---
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java
+++
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java
@@ -49,12 +49,12 @@ class IntervalTreeBasedIndexFileFilter implements
IndexFileFilter {
bloomIndexFiles.forEach(indexFileInfo -> {
if (indexFileInfo.hasKeyRanges()) {
lookUpTree.insert(new KeyRangeNode(indexFileInfo.getMinRecordKey(),
- indexFileInfo.getMaxRecordKey(), indexFileInfo.getFileName()));
+ indexFileInfo.getMaxRecordKey(), indexFileInfo.getFileId()));
} else {
if (!partitionToFilesWithNoRanges.containsKey(partition)) {
partitionToFilesWithNoRanges.put(partition, new HashSet<>());
}
-
partitionToFilesWithNoRanges.get(partition).add(indexFileInfo.getFileName());
+
partitionToFilesWithNoRanges.get(partition).add(indexFileInfo.getFileId());
}
});
partitionToFileIndexLookUpTree.put(partition, lookUpTree);
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedGlobalIndexFileFilter.java
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedGlobalIndexFileFilter.java
index c5d627d..f904126 100644
---
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedGlobalIndexFileFilter.java
+++
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedGlobalIndexFileFilter.java
@@ -43,7 +43,7 @@ class ListBasedGlobalIndexFileFilter extends
ListBasedIndexFileFilter {
// for each candidate file in partition, that needs to be compared.
for (BloomIndexFileInfo indexInfo : indexInfos) {
if (shouldCompareWithFile(indexInfo, recordKey)) {
- toReturn.add(indexInfo.getFileName());
+ toReturn.add(indexInfo.getFileId());
}
}
}
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedIndexFileFilter.java
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedIndexFileFilter.java
index 353f22d..b99280b 100644
---
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedIndexFileFilter.java
+++
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedIndexFileFilter.java
@@ -48,7 +48,7 @@ class ListBasedIndexFileFilter implements IndexFileFilter {
// for each candidate file in partition, that needs to be compared.
for (BloomIndexFileInfo indexInfo : indexInfos) {
if (shouldCompareWithFile(indexInfo, recordKey)) {
- toReturn.add(indexInfo.getFileName());
+ toReturn.add(indexInfo.getFileId());
}
}
}
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
index 87f3ba5..4eb9443 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
@@ -29,6 +29,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.ReflectionUtils;
+import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieDependentSystemUnavailableException;
@@ -60,7 +61,6 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
-
import scala.Tuple2;
/**
@@ -123,9 +123,8 @@ public class HBaseIndex<T extends HoodieRecordPayload>
extends HoodieIndex<T> {
}
@Override
- public JavaPairRDD<HoodieKey, Optional<String>>
fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
+ public JavaPairRDD<HoodieKey, Optional<Pair<String, String>>>
fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
- //TODO : Change/Remove filterExists in HoodieReadClient() and revisit
throw new UnsupportedOperationException("HBase index does not implement
check exist");
}
@@ -297,7 +296,7 @@ public class HBaseIndex<T extends HoodieRecordPayload>
extends HoodieIndex<T> {
}
Put put = new Put(Bytes.toBytes(rec.getRecordKey()));
put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN,
- Bytes.toBytes(loc.get().getCommitTime()));
+ Bytes.toBytes(loc.get().getInstantTime()));
put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN,
Bytes.toBytes(loc.get().getFileId()));
put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN,
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
index 1b4081c..cc98f56 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
@@ -59,7 +59,7 @@ import org.apache.spark.util.SizeEstimator;
/**
* IO Operation to append data onto an existing file.
*/
-public class HoodieAppendHandle<T extends HoodieRecordPayload> extends
HoodieIOHandle<T> {
+public class HoodieAppendHandle<T extends HoodieRecordPayload> extends
HoodieWriteHandle<T> {
private static Logger logger =
LogManager.getLogger(HoodieAppendHandle.class);
// This acts as the sequenceID for records written
@@ -114,7 +114,7 @@ public class HoodieAppendHandle<T extends
HoodieRecordPayload> extends HoodieIOH
RealtimeView rtView = hoodieTable.getRTFileSystemView();
Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath,
fileId);
// Set the base commit time as the current commitTime for new inserts
into log files
- String baseInstantTime = commitTime;
+ String baseInstantTime = instantTime;
if (fileSlice.isPresent()) {
baseInstantTime = fileSlice.get().getBaseInstantTime();
} else {
@@ -134,11 +134,11 @@ public class HoodieAppendHandle<T extends
HoodieRecordPayload> extends HoodieIOH
((HoodieDeltaWriteStat)
writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion());
((HoodieDeltaWriteStat)
writeStatus.getStat()).setLogOffset(writer.getCurrentSize());
} catch (Exception e) {
- logger.error("Error in update task at commit " + commitTime, e);
+ logger.error("Error in update task at commit " + instantTime, e);
writeStatus.setGlobalError(e);
throw new HoodieUpsertException(
"Failed to initialize HoodieAppendHandle for FileId: " + fileId +
" on commit "
- + commitTime + " on HDFS path " +
hoodieTable.getMetaClient().getBasePath()
+ + instantTime + " on HDFS path " +
hoodieTable.getMetaClient().getBasePath()
+ partitionPath, e);
}
Path path = new Path(partitionPath, writer.getLogFile().getFileName());
@@ -154,13 +154,13 @@ public class HoodieAppendHandle<T extends
HoodieRecordPayload> extends HoodieIOH
if (avroRecord.isPresent()) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata
in schema
avroRecord = Optional.of(rewriteRecord((GenericRecord)
avroRecord.get()));
- String seqId = HoodieRecord.generateSequenceId(commitTime,
TaskContext.getPartitionId(),
+ String seqId = HoodieRecord.generateSequenceId(instantTime,
TaskContext.getPartitionId(),
recordIndex.getAndIncrement());
HoodieAvroUtils
.addHoodieKeyToRecord((GenericRecord) avroRecord.get(),
hoodieRecord.getRecordKey(),
hoodieRecord.getPartitionPath(), fileId);
HoodieAvroUtils
- .addCommitMetadataToRecord((GenericRecord) avroRecord.get(),
commitTime, seqId);
+ .addCommitMetadataToRecord((GenericRecord) avroRecord.get(),
instantTime, seqId);
// If currentLocation is present, then this is an update
if (hoodieRecord.getCurrentLocation() != null) {
updatedRecordsWritten++;
@@ -200,7 +200,7 @@ public class HoodieAppendHandle<T extends
HoodieRecordPayload> extends HoodieIOH
private void doAppend(Map<HoodieLogBlock.HeaderMetadataType, String> header)
{
try {
- header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime);
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
writerSchema.toString());
if (recordList.size() > 0) {
writer = writer.appendBlock(new HoodieAvroDataBlock(recordList,
header));
@@ -286,7 +286,7 @@ public class HoodieAppendHandle<T extends
HoodieRecordPayload> extends HoodieIOH
private void writeToBuffer(HoodieRecord<T> record) {
// update the new location of the record, so we know where to find it next
- record.setNewLocation(new HoodieRecordLocation(commitTime, fileId));
+ record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
Optional<IndexedRecord> indexedRecord = getIndexedRecord(record);
if (indexedRecord.isPresent()) {
recordList.add(indexedRecord.get());
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
index 0b75427..cc4596e 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
@@ -41,7 +41,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
-public class HoodieCreateHandle<T extends HoodieRecordPayload> extends
HoodieIOHandle<T> {
+public class HoodieCreateHandle<T extends HoodieRecordPayload> extends
HoodieWriteHandle<T> {
private static Logger logger =
LogManager.getLogger(HoodieCreateHandle.class);
@@ -101,7 +101,7 @@ public class HoodieCreateHandle<T extends
HoodieRecordPayload> extends HoodieIOH
IndexedRecord recordWithMetadataInSchema =
rewriteRecord((GenericRecord) avroRecord.get());
storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema,
record);
// update the new location of record, so we know where to find it next
- record.setNewLocation(new HoodieRecordLocation(commitTime,
writeStatus.getFileId()));
+ record.setNewLocation(new HoodieRecordLocation(instantTime,
writeStatus.getFileId()));
recordsWritten++;
insertRecordsWritten++;
} else {
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
index 8fd0391..89d0aa2 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
@@ -18,167 +18,25 @@
package com.uber.hoodie.io;
-import com.uber.hoodie.WriteStatus;
-import com.uber.hoodie.common.io.storage.HoodieWrapperFileSystem;
-import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
-import com.uber.hoodie.common.util.FSUtils;
-import com.uber.hoodie.common.util.FailSafeConsistencyGuard;
-import com.uber.hoodie.common.util.HoodieAvroUtils;
-import com.uber.hoodie.common.util.HoodieTimer;
-import com.uber.hoodie.common.util.NoOpConsistencyGuard;
-import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
-import com.uber.hoodie.exception.HoodieException;
-import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.table.HoodieTable;
-import java.io.IOException;
-import java.util.Optional;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.spark.TaskContext;
public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
- private static Logger logger = LogManager.getLogger(HoodieIOHandle.class);
- protected final String commitTime;
- protected final String fileId;
- protected final String writeToken;
+ protected final String instantTime;
protected final HoodieWriteConfig config;
protected final FileSystem fs;
protected final HoodieTable<T> hoodieTable;
- protected final Schema originalSchema;
- protected final Schema writerSchema;
- protected HoodieTimer timer;
- protected final WriteStatus writeStatus;
- public HoodieIOHandle(HoodieWriteConfig config, String commitTime, String
fileId,
- HoodieTable<T> hoodieTable) {
- this.commitTime = commitTime;
- this.fileId = fileId;
- this.writeToken = makeSparkWriteToken();
+ HoodieIOHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T>
hoodieTable) {
+ this.instantTime = instantTime;
this.config = config;
- this.fs = getFileSystem(hoodieTable, config);
this.hoodieTable = hoodieTable;
- this.originalSchema = new Schema.Parser().parse(config.getSchema());
- this.writerSchema = createHoodieWriteSchema(originalSchema);
- this.timer = new HoodieTimer().startTimer();
- this.writeStatus = (WriteStatus)
ReflectionUtils.loadClass(config.getWriteStatusClassName(),
- !hoodieTable.getIndex().isImplicitWithStorage(),
- config.getWriteStatusFailureFraction());
+ this.fs = getFileSystem();
}
- private static FileSystem getFileSystem(HoodieTable hoodieTable,
HoodieWriteConfig config) {
- return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(),
config.isConsistencyCheckEnabled()
- ? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(),
- config.getMaxConsistencyChecks(),
config.getInitialConsistencyCheckIntervalMs(),
- config.getMaxConsistencyCheckIntervalMs()) : new
NoOpConsistencyGuard());
- }
-
- /**
- * Generate a write token based on the currently running spark task and its
place in the spark dag.
- */
- private static String makeSparkWriteToken() {
- return FSUtils.makeWriteToken(TaskContext.getPartitionId(),
TaskContext.get().stageId(),
- TaskContext.get().taskAttemptId());
- }
-
- public static Schema createHoodieWriteSchema(Schema originalSchema) {
- return HoodieAvroUtils.addMetadataFields(originalSchema);
- }
-
- public Path makeNewPath(String partitionPath) {
- Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath);
- try {
- fs.mkdirs(path); // create a new partition as needed.
- } catch (IOException e) {
- throw new HoodieIOException("Failed to make dir " + path, e);
- }
-
- return new Path(path.toString(), FSUtils.makeDataFileName(commitTime,
writeToken, fileId));
- }
-
- /**
- * Creates an empty marker file corresponding to storage writer path
- * @param partitionPath Partition path
- */
- protected void createMarkerFile(String partitionPath) {
- Path markerPath = makeNewMarkerPath(partitionPath);
- try {
- logger.info("Creating Marker Path=" + markerPath);
- fs.create(markerPath, false).close();
- } catch (IOException e) {
- throw new HoodieException("Failed to create marker file " + markerPath,
e);
- }
- }
-
- /**
- * THe marker path will be
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename
- * @param partitionPath
- * @return
- */
- private Path makeNewMarkerPath(String partitionPath) {
- Path markerRootPath = new
Path(hoodieTable.getMetaClient().getMarkerFolderPath(commitTime));
- Path path = FSUtils.getPartitionPath(markerRootPath, partitionPath);
- try {
- fs.mkdirs(path); // create a new partition as needed.
- } catch (IOException e) {
- throw new HoodieIOException("Failed to make dir " + path, e);
- }
- return new Path(path.toString(), FSUtils.makeMarkerFile(commitTime,
writeToken, fileId));
- }
-
- public Schema getWriterSchema() {
- return writerSchema;
- }
-
- /**
- * Determines whether we can accept the incoming records, into the current
file, depending on
- * <p>
- * - Whether it belongs to the same partitionPath as existing records -
Whether the current file
- * written bytes lt max file size
- */
- public boolean canWrite(HoodieRecord record) {
- return false;
- }
-
- /**
- * Perform the actual writing of the given record into the backing file.
- */
- public void write(HoodieRecord record, Optional<IndexedRecord> insertValue) {
- // NO_OP
- }
-
- /**
- * Perform the actual writing of the given record into the backing file.
- */
- public void write(HoodieRecord record, Optional<IndexedRecord> avroRecord,
Optional<Exception> exception) {
- Optional recordMetadata = record.getData().getMetadata();
- if (exception.isPresent() && exception.get() instanceof Throwable) {
- // Not throwing exception from here, since we don't want to fail the
entire job for a single record
- writeStatus.markFailure(record, exception.get(), recordMetadata);
- logger.error("Error writing record " + record, exception.get());
- } else {
- write(record, avroRecord);
- }
- }
-
- /**
- * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata
fields
- * @param record
- * @return
- */
- protected GenericRecord rewriteRecord(GenericRecord record) {
- return HoodieAvroUtils.rewriteRecord(record, writerSchema);
- }
-
- public abstract WriteStatus close();
-
- public abstract WriteStatus getWriteStatus();
+ protected abstract FileSystem getFileSystem();
}
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieKeyLookupHandle.java
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieKeyLookupHandle.java
new file mode 100644
index 0000000..c5bdbf6
--- /dev/null
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieKeyLookupHandle.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.io;
+
+import com.uber.hoodie.common.BloomFilter;
+import com.uber.hoodie.common.model.HoodieDataFile;
+import com.uber.hoodie.common.model.HoodieRecordPayload;
+import com.uber.hoodie.common.model.HoodieTableType;
+import com.uber.hoodie.common.util.HoodieTimer;
+import com.uber.hoodie.common.util.ParquetUtils;
+import com.uber.hoodie.common.util.collection.Pair;
+import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.exception.HoodieIndexException;
+import com.uber.hoodie.table.HoodieTable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Takes a bunch of keys and returns ones that are present in the file group.
+ */
+public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends
HoodieReadHandle<T> {
+
+ private static Logger logger =
LogManager.getLogger(HoodieKeyLookupHandle.class);
+
+ private final HoodieTableType tableType;
+
+ private final BloomFilter bloomFilter;
+
+ private final List<String> candidateRecordKeys;
+
+ private long totalKeysChecked;
+
+ public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T>
hoodieTable,
+ Pair<String, String> partitionPathFilePair) {
+ super(config, null, hoodieTable, partitionPathFilePair);
+ this.tableType = hoodieTable.getMetaClient().getTableType();
+ this.candidateRecordKeys = new ArrayList<>();
+ this.totalKeysChecked = 0;
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ this.bloomFilter =
ParquetUtils.readBloomFilterFromParquetMetadata(hoodieTable.getHadoopConf(),
+ new Path(getLatestDataFile().getPath()));
+ logger.info(String.format("Read bloom filter from %s in %d ms",
partitionPathFilePair, timer.endTimer()));
+ }
+
+ /**
+ * Given a list of row keys and one file, return only row keys existing in
that file.
+ */
+ public static List<String> checkCandidatesAgainstFile(Configuration
configuration,
+ List<String> candidateRecordKeys, Path filePath) throws
HoodieIndexException {
+ List<String> foundRecordKeys = new ArrayList<>();
+ try {
+ // Load all rowKeys from the file, to double-confirm
+ if (!candidateRecordKeys.isEmpty()) {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ Set<String> fileRowKeys =
ParquetUtils.filterParquetRowKeys(configuration, filePath,
+ new HashSet<>(candidateRecordKeys));
+ foundRecordKeys.addAll(fileRowKeys);
+ logger.info(String.format("Checked keys against file %s, in %d ms.
#candidates (%d) #found (%d)", filePath,
+ timer.endTimer(), candidateRecordKeys.size(),
foundRecordKeys.size()));
+ if (logger.isDebugEnabled()) {
+ logger.debug("Keys matching for file " + filePath + " => " +
foundRecordKeys);
+ }
+ }
+ } catch (Exception e) {
+ throw new HoodieIndexException("Error checking candidate keys against
file.", e);
+ }
+ return foundRecordKeys;
+ }
+
+ /**
+ * Adds the key for look up.
+ */
+ public void addKey(String recordKey) {
+ // check record key against bloom filter of current file & add to possible
keys if needed
+ if (bloomFilter.mightContain(recordKey)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Record key " + recordKey + " matches bloom filter in "
+ partitionPathFilePair);
+ }
+ candidateRecordKeys.add(recordKey);
+ }
+ totalKeysChecked++;
+ }
+
+ /**
+ * Of all the keys, that were added, return a list of keys that were
actually found in the file group.
+ */
+ public KeyLookupResult getLookupResult() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("#The candidate row keys for " + partitionPathFilePair + "
=> " + candidateRecordKeys);
+ }
+
+ HoodieDataFile dataFile = getLatestDataFile();
+ List<String> matchingKeys =
checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys,
+ new Path(dataFile.getPath()));
+ logger.info(String.format("Total records (%d), bloom filter candidates
(%d)/fp(%d), actual matches (%d)",
+ totalKeysChecked, candidateRecordKeys.size(),
candidateRecordKeys.size() - matchingKeys.size(),
+ matchingKeys.size()));
+ return new KeyLookupResult(partitionPathFilePair.getRight(),
partitionPathFilePair.getLeft(),
+ dataFile.getCommitTime(), matchingKeys);
+ }
+
+ /**
+ * Encapsulates the result from a key lookup
+ */
+ public static class KeyLookupResult {
+
+ private final String fileId;
+ private final String baseInstantTime;
+ private final List<String> matchingRecordKeys;
+ private final String partitionPath;
+
+ public KeyLookupResult(String fileId, String partitionPath, String
baseInstantTime,
+ List<String> matchingRecordKeys) {
+ this.fileId = fileId;
+ this.partitionPath = partitionPath;
+ this.baseInstantTime = baseInstantTime;
+ this.matchingRecordKeys = matchingRecordKeys;
+ }
+
+ public String getFileId() {
+ return fileId;
+ }
+
+ public String getBaseInstantTime() {
+ return baseInstantTime;
+ }
+
+ public String getPartitionPath() {
+ return partitionPath;
+ }
+
+ public List<String> getMatchingRecordKeys() {
+ return matchingRecordKeys;
+ }
+ }
+}
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
index f94e293..b22bcb3 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
@@ -28,6 +28,7 @@ import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats;
import com.uber.hoodie.common.util.DefaultSizeEstimator;
import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.HoodieRecordSizeEstimator;
import com.uber.hoodie.common.util.collection.ExternalSpillableMap;
import com.uber.hoodie.config.HoodieWriteConfig;
@@ -42,6 +43,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
@@ -50,7 +52,7 @@ import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
@SuppressWarnings("Duplicates")
-public class HoodieMergeHandle<T extends HoodieRecordPayload> extends
HoodieIOHandle<T> {
+public class HoodieMergeHandle<T extends HoodieRecordPayload> extends
HoodieWriteHandle<T> {
private static Logger logger = LogManager.getLogger(HoodieMergeHandle.class);
@@ -85,6 +87,64 @@ public class HoodieMergeHandle<T extends
HoodieRecordPayload> extends HoodieIOHa
.getPartitionPath(), dataFileToBeMerged);
}
+
+ public static Schema createHoodieWriteSchema(Schema originalSchema) {
+ return HoodieAvroUtils.addMetadataFields(originalSchema);
+ }
+
+ public Path makeNewPath(String partitionPath) {
+ Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath);
+ try {
+ fs.mkdirs(path); // create a new partition as needed.
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to make dir " + path, e);
+ }
+
+ return new Path(path.toString(), FSUtils.makeDataFileName(instantTime,
writeToken, fileId));
+ }
+
+ public Schema getWriterSchema() {
+ return writerSchema;
+ }
+
+ /**
+ * Determines whether we can accept the incoming records, into the current
file, depending on
+ * <p>
+ * - Whether it belongs to the same partitionPath as existing records -
Whether the current file written bytes lt max
+ * file size
+ */
+ public boolean canWrite(HoodieRecord record) {
+ return false;
+ }
+
+ /**
+ * Perform the actual writing of the given record into the backing file.
+ */
+ public void write(HoodieRecord record, Optional<IndexedRecord> insertValue) {
+ // NO_OP
+ }
+
+ /**
+ * Perform the actual writing of the given record into the backing file.
+ */
+ public void write(HoodieRecord record, Optional<IndexedRecord> avroRecord,
Optional<Exception> exception) {
+ Optional recordMetadata = record.getData().getMetadata();
+ if (exception.isPresent() && exception.get() instanceof Throwable) {
+ // Not throwing exception from here, since we don't want to fail the
entire job for a single record
+ writeStatus.markFailure(record, exception.get(), recordMetadata);
+ logger.error("Error writing record " + record, exception.get());
+ } else {
+ write(record, avroRecord);
+ }
+ }
+
+ /**
+ * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata
fields
+ */
+ protected GenericRecord rewriteRecord(GenericRecord record) {
+ return HoodieAvroUtils.rewriteRecord(record, writerSchema);
+ }
+
/**
* Extract old file path, initialize StorageWriter and WriteStatus
*/
@@ -95,14 +155,14 @@ public class HoodieMergeHandle<T extends
HoodieRecordPayload> extends HoodieIOHa
String latestValidFilePath = dataFileToBeMerged.getFileName();
writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath));
- HoodiePartitionMetadata partitionMetadata = new
HoodiePartitionMetadata(fs, commitTime,
+ HoodiePartitionMetadata partitionMetadata = new
HoodiePartitionMetadata(fs, instantTime,
new Path(config.getBasePath()),
FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(TaskContext.getPartitionId());
oldFilePath = new Path(
config.getBasePath() + "/" + partitionPath + "/" +
latestValidFilePath);
- String relativePath = new Path((partitionPath.isEmpty() ? "" :
partitionPath + "/") + FSUtils
- .makeDataFileName(commitTime, writeToken, fileId)).toString();
+ String relativePath = new Path((partitionPath.isEmpty() ? "" :
partitionPath + "/")
+ + FSUtils.makeDataFileName(instantTime, writeToken,
fileId)).toString();
newFilePath = new Path(config.getBasePath(), relativePath);
logger.info(String
@@ -120,13 +180,13 @@ public class HoodieMergeHandle<T extends
HoodieRecordPayload> extends HoodieIOHa
// Create the writer for writing the new version file
storageWriter = HoodieStorageWriterFactory
- .getStorageWriter(commitTime, newFilePath, hoodieTable, config,
writerSchema);
+ .getStorageWriter(instantTime, newFilePath, hoodieTable, config,
writerSchema);
} catch (IOException io) {
- logger.error("Error in update task at commit " + commitTime, io);
+ logger.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);
throw new HoodieUpsertException(
"Failed to initialize HoodieUpdateHandle for FileId: " + fileId + "
on commit "
- + commitTime + " on path " +
hoodieTable.getMetaClient().getBasePath(), io);
+ + instantTime + " on path " +
hoodieTable.getMetaClient().getBasePath(), io);
}
}
@@ -148,7 +208,7 @@ public class HoodieMergeHandle<T extends
HoodieRecordPayload> extends HoodieIOHa
partitionPath = record.getPartitionPath();
keyToNewRecords.put(record.getRecordKey(), record);
// update the new location of the record, so we know where to find it
next
- record.setNewLocation(new HoodieRecordLocation(commitTime, fileId));
+ record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
}
logger.info("Number of entries in MemoryBasedMap => "
+ ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries()
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieRangeInfoHandle.java
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieRangeInfoHandle.java
new file mode 100644
index 0000000..1f2369e
--- /dev/null
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieRangeInfoHandle.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.io;
+
+import com.uber.hoodie.common.model.HoodieDataFile;
+import com.uber.hoodie.common.model.HoodieRecordPayload;
+import com.uber.hoodie.common.util.ParquetUtils;
+import com.uber.hoodie.common.util.collection.Pair;
+import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.table.HoodieTable;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Extract range information for a given file slice
+ */
+public class HoodieRangeInfoHandle<T extends HoodieRecordPayload> extends
HoodieReadHandle<T> {
+
+ public HoodieRangeInfoHandle(HoodieWriteConfig config, HoodieTable<T>
hoodieTable,
+ Pair<String, String> partitionPathFilePair) {
+ super(config, null, hoodieTable, partitionPathFilePair);
+ }
+
+ public String[] getMinMaxKeys() {
+ HoodieDataFile dataFile = getLatestDataFile();
+ return ParquetUtils.readMinMaxRecordKeys(hoodieTable.getHadoopConf(), new
Path(dataFile.getPath()));
+ }
+}
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java
new file mode 100644
index 0000000..e0d3323
--- /dev/null
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.io;
+
+import com.uber.hoodie.common.model.HoodieDataFile;
+import com.uber.hoodie.common.model.HoodieRecordPayload;
+import com.uber.hoodie.common.util.collection.Pair;
+import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.table.HoodieTable;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * Base class for read operations done logically on the file group.
+ */
+public abstract class HoodieReadHandle<T extends HoodieRecordPayload> extends
HoodieIOHandle {
+
+ protected final Pair<String, String> partitionPathFilePair;
+
+ public HoodieReadHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T> hoodieTable,
+ Pair<String, String> partitionPathFilePair) {
+ super(config, instantTime, hoodieTable);
+ this.partitionPathFilePair = partitionPathFilePair;
+ }
+
+ @Override
+ protected FileSystem getFileSystem() {
+ return hoodieTable.getMetaClient().getFs();
+ }
+
+
+ public Pair<String, String> getPartitionPathFilePair() {
+ return partitionPathFilePair;
+ }
+
+ public String getFileId() {
+ return partitionPathFilePair.getRight();
+ }
+
+ protected HoodieDataFile getLatestDataFile() {
+ return hoodieTable.getROFileSystemView()
+ .getLatestDataFile(partitionPathFilePair.getLeft(),
partitionPathFilePair.getRight()).get();
+ }
+}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java
similarity index 84%
copy from hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
copy to hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java
index 8fd0391..1adeecc 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java
@@ -43,29 +43,23 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
+/**
+ * Base class for all write operations logically performed at the file group
level.
+ */
+public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends
HoodieIOHandle {
-public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
-
- private static Logger logger = LogManager.getLogger(HoodieIOHandle.class);
- protected final String commitTime;
- protected final String fileId;
- protected final String writeToken;
- protected final HoodieWriteConfig config;
- protected final FileSystem fs;
- protected final HoodieTable<T> hoodieTable;
+ private static Logger logger = LogManager.getLogger(HoodieWriteHandle.class);
protected final Schema originalSchema;
protected final Schema writerSchema;
protected HoodieTimer timer;
protected final WriteStatus writeStatus;
+ protected final String fileId;
+ protected final String writeToken;
- public HoodieIOHandle(HoodieWriteConfig config, String commitTime, String
fileId,
- HoodieTable<T> hoodieTable) {
- this.commitTime = commitTime;
+ public HoodieWriteHandle(HoodieWriteConfig config, String instantTime,
String fileId, HoodieTable<T> hoodieTable) {
+ super(config, instantTime, hoodieTable);
this.fileId = fileId;
this.writeToken = makeSparkWriteToken();
- this.config = config;
- this.fs = getFileSystem(hoodieTable, config);
- this.hoodieTable = hoodieTable;
this.originalSchema = new Schema.Parser().parse(config.getSchema());
this.writerSchema = createHoodieWriteSchema(originalSchema);
this.timer = new HoodieTimer().startTimer();
@@ -101,11 +95,12 @@ public abstract class HoodieIOHandle<T extends
HoodieRecordPayload> {
throw new HoodieIOException("Failed to make dir " + path, e);
}
- return new Path(path.toString(), FSUtils.makeDataFileName(commitTime,
writeToken, fileId));
+ return new Path(path.toString(), FSUtils.makeDataFileName(instantTime,
writeToken, fileId));
}
/**
* Creates an empty marker file corresponding to storage writer path
+ *
* @param partitionPath Partition path
*/
protected void createMarkerFile(String partitionPath) {
@@ -120,18 +115,16 @@ public abstract class HoodieIOHandle<T extends
HoodieRecordPayload> {
/**
* THe marker path will be
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename
- * @param partitionPath
- * @return
*/
private Path makeNewMarkerPath(String partitionPath) {
- Path markerRootPath = new
Path(hoodieTable.getMetaClient().getMarkerFolderPath(commitTime));
+ Path markerRootPath = new
Path(hoodieTable.getMetaClient().getMarkerFolderPath(instantTime));
Path path = FSUtils.getPartitionPath(markerRootPath, partitionPath);
try {
fs.mkdirs(path); // create a new partition as needed.
} catch (IOException e) {
throw new HoodieIOException("Failed to make dir " + path, e);
}
- return new Path(path.toString(), FSUtils.makeMarkerFile(commitTime,
writeToken, fileId));
+ return new Path(path.toString(), FSUtils.makeMarkerFile(instantTime,
writeToken, fileId));
}
public Schema getWriterSchema() {
@@ -141,8 +134,8 @@ public abstract class HoodieIOHandle<T extends
HoodieRecordPayload> {
/**
* Determines whether we can accept the incoming records, into the current
file, depending on
* <p>
- * - Whether it belongs to the same partitionPath as existing records -
Whether the current file
- * written bytes lt max file size
+ * - Whether it belongs to the same partitionPath as existing records -
Whether the current file written bytes lt max
+ * file size
*/
public boolean canWrite(HoodieRecord record) {
return false;
@@ -171,8 +164,6 @@ public abstract class HoodieIOHandle<T extends
HoodieRecordPayload> {
/**
* Rewrite the GenericRecord with the Schema containing the Hoodie Metadata
fields
- * @param record
- * @return
*/
protected GenericRecord rewriteRecord(GenericRecord record) {
return HoodieAvroUtils.rewriteRecord(record, writerSchema);
@@ -181,4 +172,12 @@ public abstract class HoodieIOHandle<T extends
HoodieRecordPayload> {
public abstract WriteStatus close();
public abstract WriteStatus getWriteStatus();
+
+ @Override
+ protected FileSystem getFileSystem() {
+ return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(),
config.isConsistencyCheckEnabled()
+ ? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(),
+ config.getMaxConsistencyChecks(),
config.getInitialConsistencyCheckIntervalMs(),
+ config.getMaxConsistencyCheckIntervalMs()) : new
NoOpConsistencyGuard());
+ }
}
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
index ac81a43..6b9f4f6 100644
---
a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
+++
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
@@ -500,7 +500,7 @@ public class HoodieCopyOnWriteTable<T extends
HoodieRecordPayload> extends Hoodi
/**
* Helper class for a small file's location and its actual size on disk
*/
- class SmallFile implements Serializable {
+ static class SmallFile implements Serializable {
HoodieRecordLocation location;
long sizeBytes;
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadStat.java
b/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadStat.java
index 2739981..a613f3b 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadStat.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadStat.java
@@ -43,7 +43,7 @@ public class WorkloadStat implements Serializable {
}
long addUpdates(HoodieRecordLocation location, long numUpdates) {
- updateLocationToCount.put(location.getFileId(),
Pair.of(location.getCommitTime(), numUpdates));
+ updateLocationToCount.put(location.getFileId(),
Pair.of(location.getInstantTime(), numUpdates));
return this.numUpdates += numUpdates;
}
diff --git
a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
index fb1d1ee..ae2e57c 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
@@ -217,7 +217,7 @@ public class TestHoodieClientBase implements Serializable {
for (HoodieRecord rec : taggedRecords) {
assertTrue("Record " + rec + " found with no location.",
rec.isCurrentLocationKnown());
assertEquals("All records should have commit time " + commitTime + ",
since updates were made",
- rec.getCurrentLocation().getCommitTime(), commitTime);
+ rec.getCurrentLocation().getInstantTime(), commitTime);
}
}
diff --git
a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
index 603ef8f..24bb4fa 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
@@ -166,7 +166,7 @@ public class TestHbaseIndex {
assertTrue(javaRDD.filter(record ->
record.isCurrentLocationKnown()).collect().size() == 200);
assertTrue(javaRDD.map(record ->
record.getKey().getRecordKey()).distinct().count() == 200);
assertTrue(javaRDD.filter(
- record -> (record.getCurrentLocation() != null &&
record.getCurrentLocation().getCommitTime()
+ record -> (record.getCurrentLocation() != null &&
record.getCurrentLocation().getInstantTime()
.equals(newCommitTime))).distinct().count() == 200);
}
diff --git
a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
index e62760a..f2e0784 100644
---
a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
+++
b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
@@ -36,8 +36,10 @@ import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
+import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.io.HoodieKeyLookupHandle;
import com.uber.hoodie.table.HoodieTable;
import java.io.File;
import java.io.IOException;
@@ -200,10 +202,10 @@ public class TestHoodieBloomIndex {
// no longer sorted, but should have same files.
List<Tuple2<String, BloomIndexFileInfo>> expected = Arrays.asList(
- new Tuple2<>("2016/04/01", new
BloomIndexFileInfo("2_0_20160401010101.parquet")),
- new Tuple2<>("2015/03/12", new
BloomIndexFileInfo("1_0_20150312101010.parquet")),
- new Tuple2<>("2015/03/12", new
BloomIndexFileInfo("3_0_20150312101010.parquet", "000", "000")),
- new Tuple2<>("2015/03/12", new
BloomIndexFileInfo("4_0_20150312101010.parquet", "001", "003")));
+ new Tuple2<>("2016/04/01", new BloomIndexFileInfo("2")),
+ new Tuple2<>("2015/03/12", new BloomIndexFileInfo("1")),
+ new Tuple2<>("2015/03/12", new BloomIndexFileInfo("3", "000",
"000")),
+ new Tuple2<>("2015/03/12", new BloomIndexFileInfo("4", "001",
"003")));
assertEquals(expected, filesList);
}
}
@@ -279,7 +281,7 @@ public class TestHoodieBloomIndex {
List<String> uuids = Arrays.asList(record1.getRecordKey(),
record2.getRecordKey(), record3.getRecordKey(),
record4.getRecordKey());
- List<String> results =
HoodieBloomIndexCheckFunction.checkCandidatesAgainstFile(jsc.hadoopConfiguration(),
uuids,
+ List<String> results =
HoodieKeyLookupHandle.checkCandidatesAgainstFile(jsc.hadoopConfiguration(),
uuids,
new Path(basePath + "/2016/01/31/" + filename));
assertEquals(results.size(), 2);
assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")
|| results.get(1).equals(
@@ -417,10 +419,11 @@ public class TestHoodieBloomIndex {
// Let's tag
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
- JavaPairRDD<HoodieKey, Optional<String>> taggedRecordRDD =
bloomIndex.fetchRecordLocation(keysRDD, jsc, table);
+ JavaPairRDD<HoodieKey, Optional<Pair<String, String>>> taggedRecordRDD =
bloomIndex
+ .fetchRecordLocation(keysRDD, jsc, table);
// Should not find any files
- for (Tuple2<HoodieKey, Optional<String>> record :
taggedRecordRDD.collect()) {
+ for (Tuple2<HoodieKey, Optional<Pair<String, String>>> record :
taggedRecordRDD.collect()) {
assertTrue(!record._2.isPresent());
}
@@ -438,18 +441,16 @@ public class TestHoodieBloomIndex {
taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table);
// Check results
- for (Tuple2<HoodieKey, Optional<String>> record :
taggedRecordRDD.collect()) {
+ for (Tuple2<HoodieKey, Optional<Pair<String, String>>> record :
taggedRecordRDD.collect()) {
if
(record._1.getRecordKey().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")) {
assertTrue(record._2.isPresent());
- Path path1 = new Path(record._2.get());
- assertEquals(FSUtils.getFileId(filename1),
FSUtils.getFileId(path1.getName()));
+ assertEquals(FSUtils.getFileId(filename1), record._2.get().getRight());
} else if
(record._1.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) {
assertTrue(record._2.isPresent());
- Path path2 = new Path(record._2.get());
if (record._1.getPartitionPath().equals("2015/01/31")) {
- assertEquals(FSUtils.getFileId(filename3),
FSUtils.getFileId(path2.getName()));
+ assertEquals(FSUtils.getFileId(filename3),
record._2.get().getRight());
} else {
- assertEquals(FSUtils.getFileId(filename2),
FSUtils.getFileId(path2.getName()));
+ assertEquals(FSUtils.getFileId(filename2),
record._2.get().getRight());
}
} else if
(record._1.getRecordKey().equals("3eb5b87c-1fej-4edd-87b4-6ec96dc405a0")) {
assertTrue(!record._2.isPresent());
diff --git
a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java
b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java
index 0a812e3..4048a5e 100644
---
a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java
+++
b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -18,7 +18,11 @@
package com.uber.hoodie.index.bloom;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import com.google.common.collect.Lists;
import com.uber.hoodie.common.HoodieClientTestUtils;
@@ -32,16 +36,17 @@ import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.table.HoodieTable;
-
import java.io.File;
import java.io.IOException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
-
import org.apache.avro.Schema;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
-
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -154,20 +159,20 @@ public class TestHoodieGlobalBloomIndex {
Map<String, BloomIndexFileInfo> filesMap = toFileMap(filesList);
// key ranges checks
-
assertNull(filesMap.get("2016/04/01/2_0_20160401010101.parquet").getMaxRecordKey());
-
assertNull(filesMap.get("2016/04/01/2_0_20160401010101.parquet").getMinRecordKey());
-
assertFalse(filesMap.get("2015/03/12/1_0_20150312101010.parquet").hasKeyRanges());
-
assertNotNull(filesMap.get("2015/03/12/3_0_20150312101010.parquet").getMaxRecordKey());
-
assertNotNull(filesMap.get("2015/03/12/3_0_20150312101010.parquet").getMinRecordKey());
-
assertTrue(filesMap.get("2015/03/12/3_0_20150312101010.parquet").hasKeyRanges());
+ assertNull(filesMap.get("2016/04/01/2").getMaxRecordKey());
+ assertNull(filesMap.get("2016/04/01/2").getMinRecordKey());
+ assertFalse(filesMap.get("2015/03/12/1").hasKeyRanges());
+ assertNotNull(filesMap.get("2015/03/12/3").getMaxRecordKey());
+ assertNotNull(filesMap.get("2015/03/12/3").getMinRecordKey());
+ assertTrue(filesMap.get("2015/03/12/3").hasKeyRanges());
Map<String, BloomIndexFileInfo> expected = new HashMap<>();
- expected.put("2016/04/01/2_0_20160401010101.parquet", new
BloomIndexFileInfo("2_0_20160401010101.parquet"));
- expected.put("2015/03/12/1_0_20150312101010.parquet", new
BloomIndexFileInfo("1_0_20150312101010.parquet"));
- expected.put("2015/03/12/3_0_20150312101010.parquet",
- new BloomIndexFileInfo("3_0_20150312101010.parquet", "000", "000"));
- expected.put("2015/03/12/4_0_20150312101010.parquet",
- new BloomIndexFileInfo("4_0_20150312101010.parquet", "001", "003"));
+ expected.put("2016/04/01/2", new BloomIndexFileInfo("2"));
+ expected.put("2015/03/12/1", new BloomIndexFileInfo("1"));
+ expected.put("2015/03/12/3",
+ new BloomIndexFileInfo("3", "000", "000"));
+ expected.put("2015/03/12/4",
+ new BloomIndexFileInfo("4", "001", "003"));
assertEquals(expected, filesMap);
}
@@ -300,7 +305,7 @@ public class TestHoodieGlobalBloomIndex {
private Map<String, BloomIndexFileInfo> toFileMap(List<Tuple2<String,
BloomIndexFileInfo>> filesList) {
Map<String, BloomIndexFileInfo> filesMap = new HashMap<>();
for (Tuple2<String, BloomIndexFileInfo> t : filesList) {
- filesMap.put(t._1() + "/" + t._2().getFileName(), t._2());
+ filesMap.put(t._1() + "/" + t._2().getFileId(), t._2());
}
return filesMap;
}
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordLocation.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordLocation.java
index ddf9777..75f41ef 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordLocation.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordLocation.java
@@ -27,11 +27,11 @@ import java.io.Serializable;
*/
public class HoodieRecordLocation implements Serializable {
- private final String commitTime;
+ private final String instantTime;
private final String fileId;
- public HoodieRecordLocation(String commitTime, String fileId) {
- this.commitTime = commitTime;
+ public HoodieRecordLocation(String instantTime, String fileId) {
+ this.instantTime = instantTime;
this.fileId = fileId;
}
@@ -44,26 +44,26 @@ public class HoodieRecordLocation implements Serializable {
return false;
}
HoodieRecordLocation otherLoc = (HoodieRecordLocation) o;
- return Objects.equal(commitTime, otherLoc.commitTime)
+ return Objects.equal(instantTime, otherLoc.instantTime)
&& Objects.equal(fileId, otherLoc.fileId);
}
@Override
public int hashCode() {
- return Objects.hashCode(commitTime, fileId);
+ return Objects.hashCode(instantTime, fileId);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("HoodieRecordLocation {");
- sb.append("commitTime=").append(commitTime).append(", ");
+ sb.append("instantTime=").append(instantTime).append(", ");
sb.append("fileId=").append(fileId);
sb.append('}');
return sb.toString();
}
- public String getCommitTime() {
- return commitTime;
+ public String getInstantTime() {
+ return instantTime;
}
public String getFileId() {
diff --git
a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java
b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java
index e42dbcd..fd821e7 100644
---
a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java
+++
b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java
@@ -346,10 +346,10 @@ public class HoodieTestUtils {
try {
logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new
Path(basePath, partitionPath))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
- .overBaseCommit(location.getCommitTime()).withFs(fs).build();
+ .overBaseCommit(location.getInstantTime()).withFs(fs).build();
Map<HoodieLogBlock.HeaderMetadataType, String> header =
Maps.newHashMap();
- header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME,
location.getCommitTime());
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME,
location.getInstantTime());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
schema.toString());
logWriter.appendBlock(new
HoodieAvroDataBlock(s.getValue().stream().map(r -> {
try {
diff --git
a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java
b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java
index 572ea4d..1cfb254 100644
---
a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java
+++
b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java
@@ -151,7 +151,7 @@ public class TestExternalSpillableMap {
assert onDiskHoodieRecord.getKey().equals(records.get(dkey).getKey());
// compare the member variables of HoodieRecord not set by the constructor
assert
records.get(ikey).getCurrentLocation().getFileId().equals(SpillableMapTestUtils.DUMMY_FILE_ID);
- assert
records.get(ikey).getCurrentLocation().getCommitTime().equals(SpillableMapTestUtils.DUMMY_COMMIT_TIME);
+ assert
records.get(ikey).getCurrentLocation().getInstantTime().equals(SpillableMapTestUtils.DUMMY_COMMIT_TIME);
// test contains
assertTrue(records.containsKey(ikey));