nsivabalan commented on code in PR #8758:
URL: https://github.com/apache/hudi/pull/8758#discussion_r1225438663
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -432,6 +468,7 @@ private boolean initializeFromFilesystem(String
initializationTime, List<Metadat
bulkCommit(commitTimeForPartition, partitionType, records,
fileGroupCount);
metadataMetaClient.reloadActiveTimeline();
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient,
partitionType, true);
+ initMetadataReader();
Review Comment:
I don't think this is required.
if there are no partition to init, we initialize reader at L259. and if
there is any to init, we initialize reader at L434.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object>
{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+ // The index to fallback upon when record index is not initialized yet.
+ // This should be a global index like record index so that the behavior of
tagging across partitions is not changed.
+ private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE =
IndexType.GLOBAL_SIMPLE;
+
+ public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+ super(config);
+ }
+
+ @Override
+ public <R> HoodieData<HoodieRecord<R>>
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
+ int fileGroupSize;
+ try {
+
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+ fileGroupSize =
hoodieTable.getMetadataTable().getNumShards(MetadataPartitionType.RECORD_INDEX);
+ ValidationUtils.checkState(fileGroupSize > 0, "Record index should have
at least one file group");
+ } catch (TableNotFoundException | IllegalStateException e) {
+ // This means that record index has not been initialized.
+ LOG.warn(String.format("Record index not initialized so falling back to
%s for tagging records", FALLBACK_INDEX_TYPE.name()));
Review Comment:
if we don't fail here, how does user know that RLI has some issue and needs
manual intervention ?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -683,12 +683,19 @@ public void initializeBootstrapDirsIfNotExists() throws
IOException {
initializeBootstrapDirsIfNotExists(getHadoopConf(), basePath.toString(),
getFs());
}
+ /**
+ * Reload table config and cache.
+ */
+ public synchronized void reloadTableConfig() {
Review Comment:
I see this is used only in tests. we should not add any apis to source code
just for test purposes.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -163,16 +160,16 @@ public List<String>
getPartitionPathWithPathPrefixes(List<String> relativePathPr
public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(List<String> keyPrefixes,
String partitionName,
boolean shouldLoadInMemory) {
- // Sort the columns so that keys are looked up in order
+ // Sort the prefixes so that keys are looked up in order
List<String> sortedKeyPrefixes = new ArrayList<>(keyPrefixes);
Collections.sort(sortedKeyPrefixes);
// NOTE: Since we partition records to a particular file-group by full
key, we will have
// to scan all file-groups for all key-prefixes as each of these
might contain some
// records matching the key-prefix
- List<FileSlice> partitionFileSlices =
- HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(
- metadataMetaClient, metadataFileSystemView, partitionName);
+ List<FileSlice> partitionFileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
Review Comment:
getRecordsByKeyPrefixes is called from within a spark task. I am assuming
you are making the changes having this in mind. ie. partitionFileSliceMap will
be in driver and all executors/tasks will share it.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -193,121 +190,126 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(L
return Collections.emptyIterator();
}
- boolean fullKeys = false;
+ boolean fullKeys = false;
- Map<String, Option<HoodieRecord<HoodieMetadataPayload>>>
logRecords =
- readLogRecords(logRecordScanner, sortedKeyPrefixes,
fullKeys, timings);
+ Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords =
+ readLogRecords(logRecordScanner, sortedKeyPrefixes,
fullKeys, timings);
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
mergedRecords =
- readFromBaseAndMergeWithLogRecords(baseFileReader,
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
+ Map<String, HoodieRecord<HoodieMetadataPayload>> mergedRecords
=
+ readFromBaseAndMergeWithLogRecords(baseFileReader,
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
- LOG.debug(String.format("Metadata read for %s keys took
[baseFileRead, logMerge] %s ms",
- sortedKeyPrefixes.size(), timings));
+ LOG.debug(String.format("Metadata read for %s keys took
[baseFileRead, logMerge] %s ms",
+ sortedKeyPrefixes.size(), timings));
- return mergedRecords.stream()
- .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
- .filter(Objects::nonNull)
- .iterator();
- } catch (IOException ioe) {
- throw new HoodieIOException("Error merging records from metadata
table for " + sortedKeyPrefixes.size() + " key : ", ioe);
- } finally {
- closeReader(readers);
- }
- });
+ return mergedRecords.values().iterator();
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Error merging records from
metadata table for " + sortedKeyPrefixes.size() + " key : ", ioe);
+ } finally {
+ closeReader(readers);
+ }
+ });
}
@Override
- public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
getRecordsByKeys(List<String> keys,
-
String partitionName) {
- // Sort the columns so that keys are looked up in order
- List<String> sortedKeys = new ArrayList<>(keys);
- Collections.sort(sortedKeys);
- Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap =
getPartitionFileSliceToKeysMapping(partitionName, sortedKeys);
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result =
new ArrayList<>();
- AtomicInteger fileSlicesKeysCount = new AtomicInteger();
- partitionFileSliceToKeysMap.forEach((partitionFileSlicePair,
fileSliceKeys) -> {
- Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
- getOrCreateReaders(partitionName, partitionFileSlicePair.getRight());
- try {
- List<Long> timings = new ArrayList<>();
- HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
- HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
- if (baseFileReader == null && logRecordScanner == null) {
- return;
- }
+ protected Map<String, HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeys(List<String> keys, String partitionName) {
+ if (keys.isEmpty()) {
+ return Collections.emptyMap();
+ }
- boolean fullKeys = true;
- Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
- readLogRecords(logRecordScanner, fileSliceKeys, fullKeys, timings);
-
- result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader,
fileSliceKeys, fullKeys, logRecords,
- timings, partitionName));
-
- LOG.debug(String.format("Metadata read for %s keys took [baseFileRead,
logMerge] %s ms",
- fileSliceKeys.size(), timings));
- fileSlicesKeysCount.addAndGet(fileSliceKeys.size());
- } catch (IOException ioe) {
- throw new HoodieIOException("Error merging records from metadata table
for " + sortedKeys.size() + " key : ", ioe);
- } finally {
- if (!reuse) {
- closeReader(readers);
- }
+ Map<String, HoodieRecord<HoodieMetadataPayload>> result;
+
+ // Load the file slices for the partition. Each file slice is a shard
which saves a portion of the keys.
+ List<FileSlice> partitionFileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
+ k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
metadataFileSystemView, partitionName));
+ final int numFileSlices = partitionFileSlices.size();
+ ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for
partition " + partitionName + " should be > 0");
+
+ // Lookup keys from each file slice
+ if (numFileSlices == 1) {
+ // Optimization for a single slice for smaller metadata table partitions
+ result = lookupKeysFromFileSlice(partitionName, keys,
partitionFileSlices.get(0));
+ } else {
+ // Parallel lookup for large sized partitions with many file slices
+ // Partition the keys by the file slice which contains it
+ ArrayList<ArrayList<String>> partitionedKeys = new
ArrayList<>(numFileSlices);
+ for (int i = 0; i < numFileSlices; ++i) {
+ partitionedKeys.add(new ArrayList<>());
}
- });
+ keys.forEach(key -> {
+ int shardIndex =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, numFileSlices);
+ partitionedKeys.get(shardIndex).add(key);
+ });
+
+ result = new HashMap<>(keys.size());
+ engineContext.setJobStatus(this.getClass().getSimpleName(), "Reading
keys from metadata table partition " + partitionName);
+ engineContext.map(partitionedKeys, keysList -> {
Review Comment:
lets add javav docs calling out that this is a local engine context and
under the hood uses java parallel streams.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object>
{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+ // The index to fallback upon when record index is not initialized yet.
+ // This should be a global index like record index so that the behavior of
tagging across partitions is not changed.
+ private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE =
IndexType.GLOBAL_SIMPLE;
+
+ public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+ super(config);
+ }
+
+ @Override
+ public <R> HoodieData<HoodieRecord<R>>
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
+ int fileGroupSize;
+ try {
+
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+ fileGroupSize =
hoodieTable.getMetadataTable().getNumShards(MetadataPartitionType.RECORD_INDEX);
+ ValidationUtils.checkState(fileGroupSize > 0, "Record index should have
at least one file group");
+ } catch (TableNotFoundException | IllegalStateException e) {
+ // This means that record index has not been initialized.
+ LOG.warn(String.format("Record index not initialized so falling back to
%s for tagging records", FALLBACK_INDEX_TYPE.name()));
+
+ // Fallback to another index so that tagLocation is still accurate and
there are no duplicates.
+ HoodieWriteConfig otherConfig =
HoodieWriteConfig.newBuilder().withProperties(config.getProps())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(FALLBACK_INDEX_TYPE).build()).build();
+ HoodieIndex fallbackIndex =
SparkHoodieIndexFactory.createIndex(otherConfig);
+
+ // Fallback index needs to be a global index like record index
+ ValidationUtils.checkArgument(fallbackIndex.isGlobal(), "Fallback index
needs to be a global index like record index");
+
+ return fallbackIndex.tagLocation(records, context, hoodieTable);
+ }
+
+ // final variable required for lamda functions below
+ final int numFileGroups = fileGroupSize;
+
+ // Partition the record keys to lookup such that each partition looks up
one record index shard
+ JavaRDD<String> partitionedKeyRDD = HoodieJavaRDD.getJavaRDD(records)
+ .map(HoodieRecord::getRecordKey)
+ .keyBy(k -> HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(k,
numFileGroups))
+ .partitionBy(new PartitionIdPassthrough(numFileGroups))
+ .map(t -> t._2);
+ ValidationUtils.checkState(partitionedKeyRDD.getNumPartitions() <=
numFileGroups);
+
+ // Lookup the keys in the record index
+ HoodiePairData<String, HoodieRecordGlobalLocation> keyToLocationPairRDD =
+ HoodieJavaPairRDD.of(partitionedKeyRDD.mapPartitionsToPair(new
RecordIndexFileGroupLookupFunction(hoodieTable)));
+
+ // Tag the incoming records, as inserts or updates, by joining with
existing record keys
+ HoodieData<HoodieRecord<R>> taggedRecords =
tagLocationBackToRecords(keyToLocationPairRDD, records);
+
+ // The number of partitions in the taggedRecords is expected to the
maximum of the partitions in
+ // keyToLocationPairRDD and records RDD.
+
+ return taggedRecords;
+ }
+
+ @Override
+ public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus>
writeStatuses, HoodieEngineContext context,
+ HoodieTable hoodieTable) {
+ // This is a no-op as metadata record index updates are automatically
maintained within the metadata table.
+ return writeStatuses;
+ }
+
+ @Override
+ public boolean rollbackCommit(String instantTime) {
Review Comment:
may be we should pass hoodietable as an argument to rollbackCommit and
leverage the metadata table in hoodieTable. and by which we can fetch
mdtTimeline instead of constructing a new one below.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -911,8 +1018,8 @@ public void close() throws Exception {
protected void bulkCommit(
String instantTime, MetadataPartitionType partitionType,
HoodieData<HoodieRecord> records,
int fileGroupCount) {
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap =
new HashMap<>();
- partitionRecordsMap.put(partitionType, records);
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap =
+ Collections.singletonMap(partitionType, records);
Review Comment:
we can directly do
```
commit(instantTime, Collections.singletonMap(partitionType, records))
```
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object>
{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+ // The index to fallback upon when record index is not initialized yet.
+ // This should be a global index like record index so that the behavior of
tagging across partitions is not changed.
+ private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE =
IndexType.GLOBAL_SIMPLE;
+
+ public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+ super(config);
+ }
+
+ @Override
+ public <R> HoodieData<HoodieRecord<R>>
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
+ int fileGroupSize;
+ try {
+
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+ fileGroupSize =
hoodieTable.getMetadataTable().getNumShards(MetadataPartitionType.RECORD_INDEX);
+ ValidationUtils.checkState(fileGroupSize > 0, "Record index should have
at least one file group");
+ } catch (TableNotFoundException | IllegalStateException e) {
+ // This means that record index has not been initialized.
+ LOG.warn(String.format("Record index not initialized so falling back to
%s for tagging records", FALLBACK_INDEX_TYPE.name()));
+
+ // Fallback to another index so that tagLocation is still accurate and
there are no duplicates.
+ HoodieWriteConfig otherConfig =
HoodieWriteConfig.newBuilder().withProperties(config.getProps())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(FALLBACK_INDEX_TYPE).build()).build();
+ HoodieIndex fallbackIndex =
SparkHoodieIndexFactory.createIndex(otherConfig);
+
+ // Fallback index needs to be a global index like record index
+ ValidationUtils.checkArgument(fallbackIndex.isGlobal(), "Fallback index
needs to be a global index like record index");
+
+ return fallbackIndex.tagLocation(records, context, hoodieTable);
+ }
+
+ // final variable required for lamda functions below
+ final int numFileGroups = fileGroupSize;
+
+ // Partition the record keys to lookup such that each partition looks up
one record index shard
+ JavaRDD<String> partitionedKeyRDD = HoodieJavaRDD.getJavaRDD(records)
+ .map(HoodieRecord::getRecordKey)
+ .keyBy(k -> HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(k,
numFileGroups))
+ .partitionBy(new PartitionIdPassthrough(numFileGroups))
+ .map(t -> t._2);
+ ValidationUtils.checkState(partitionedKeyRDD.getNumPartitions() <=
numFileGroups);
+
+ // Lookup the keys in the record index
+ HoodiePairData<String, HoodieRecordGlobalLocation> keyToLocationPairRDD =
+ HoodieJavaPairRDD.of(partitionedKeyRDD.mapPartitionsToPair(new
RecordIndexFileGroupLookupFunction(hoodieTable)));
+
+ // Tag the incoming records, as inserts or updates, by joining with
existing record keys
+ HoodieData<HoodieRecord<R>> taggedRecords =
tagLocationBackToRecords(keyToLocationPairRDD, records);
+
+ // The number of partitions in the taggedRecords is expected to the
maximum of the partitions in
+ // keyToLocationPairRDD and records RDD.
+
+ return taggedRecords;
+ }
+
+ @Override
+ public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus>
writeStatuses, HoodieEngineContext context,
+ HoodieTable hoodieTable) {
+ // This is a no-op as metadata record index updates are automatically
maintained within the metadata table.
+ return writeStatuses;
+ }
+
+ @Override
+ public boolean rollbackCommit(String instantTime) {
+ // Only those deltacommits which have a valid completed commit on the
dataset are read. Since, the instantTime
+ // is being rolled back on the dataset, we will not load the records from
the deltacommit and it is virtually
+ // rolled back. In other words, there is no need to rollback any
deltacommit here except if the deltacommit
+ // was compacted and a new basefile has been created.
+ try {
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath()))
+ .setConf(new Configuration()).build();
+ HoodieTimeline commitTimeline =
metaClient.getCommitTimeline().filterCompletedInstants();
+ if (commitTimeline.empty()) {
+ // No compaction yet so no need to check for deltacommits due to the
logic above
+ return true;
+ }
+
+ if (HoodieTimeline.compareTimestamps(instantTime, GREATER_THAN,
commitTimeline.lastInstant().get().getTimestamp())) {
+ // After the last compaction so no rollback required as per logic above
+ return true;
+ }
+ LOG.warn("Cannot rollback instant " + instantTime + " because the
corresponding deltacommit has been compacted "
+ + " in " + commitTimeline.lastInstant().get().getTimestamp());
+ return false;
+ } catch (TableNotFoundException e) {
+ // Metadata table is not setup. Nothing to rollback. Exit gracefully.
+ LOG.warn("Cannot rollback instant " + instantTime + " as metadata table
is not found");
+ return true;
+ }
+ }
+
+ @Override
+ public boolean isGlobal() {
+ return true;
+ }
+
+ @Override
+ public boolean canIndexLogFiles() {
+ return true;
Review Comment:
are these true? do we consider new inserts added to log files as well?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -819,9 +917,18 @@ public void buildMetadataPartitions(HoodieEngineContext
engineContext, List<Hood
* @param instantTime Timestamp at which the commit was performed
*/
@Override
- public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
- processAndCommit(instantTime, () ->
HoodieTableMetadataUtil.convertMetadataToRecords(
- engineContext, commitMetadata, instantTime,
getRecordsGenerationParams()));
+ public void update(HoodieCommitMetadata commitMetadata,
HoodieData<WriteStatus> writeStatus, String instantTime) {
+ processAndCommit(instantTime, () -> {
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordMap =
+ HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
commitMetadata, instantTime, getRecordsGenerationParams());
+
+ // Updates for record index are created by parsing the WriteStatus which
is a hudi-client object. Hence, we cannot yet move this code
+ // to the HoodieTableMetadataUtil class in hudi-common.
+ if (writeStatus != null && !writeStatus.isEmpty()) {
+ partitionToRecordMap.put(MetadataPartitionType.RECORD_INDEX,
getRecordIndexUpdates(writeStatus));
Review Comment:
why can't we move this within convertMetadataToRecords ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -474,14 +511,80 @@ private Pair<Integer, HoodieData<HoodieRecord>>
initializeBloomFiltersPartition(
return Pair.of(fileGroupCount, records);
}
- private Pair<Integer, HoodieData<HoodieRecord>>
initializeFilesPartition(String createInstantTime, List<DirectoryInfo>
partitionInfoList) {
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeRecordIndexPartition() throws IOException {
+ final HoodieMetadataFileSystemView fsView = new
HoodieMetadataFileSystemView(dataMetaClient,
Review Comment:
already in L416 to 422, we list all partitions and collect file info.
can we try reusing them while calling initializeRecordIndexPartition()? For
eg, partitionBaseFilePairs in L520 is already available at the callers end.
we don't need to re-instantiate another FS view again
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -494,14 +597,8 @@ private Pair<Integer, HoodieData<HoodieRecord>>
initializeFilesPartition(String
engineContext.setJobStatus(this.getClass().getSimpleName(), "Creating
records for MDT FILES partition");
HoodieData<HoodieRecord> fileListRecords =
engineContext.parallelize(partitionInfoList,
partitionInfoList.size()).map(partitionInfo -> {
Map<String, Long> fileNameToSizeMap =
partitionInfo.getFileNameToSizeMap();
- // filter for files that are part of the completed commits
- Map<String, Long> validFileNameToSizeMap =
fileNameToSizeMap.entrySet().stream().filter(fileSizePair -> {
Review Comment:
may I know where have we moved this code? we need to have this filtering.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -241,105 +221,91 @@ public List<MetadataPartitionType>
getEnabledPartitionTypes() {
return this.enabledPartitionTypes;
}
- /**
- * Initialize the metadata table if it does not exist.
- * <p>
- * If the metadata table does not exist, then file and partition listing is
used to initialize the table.
- *
- * @param engineContext
- * @param actionMetadata Action metadata types extending Avro
generated SpecificRecordBase
- * @param inflightInstantTimestamp Timestamp of an instant in progress on
the dataset. This instant is ignored
- * while deciding to initialize the metadata
table.
- */
- protected abstract <T extends SpecificRecordBase> void
initialize(HoodieEngineContext engineContext,
- Option<T>
actionMetadata,
-
Option<String> inflightInstantTimestamp);
-
- public void initTableMetadata() {
- try {
- if (this.metadata != null) {
- this.metadata.close();
- }
- this.metadata = new HoodieBackedTableMetadata(engineContext,
dataWriteConfig.getMetadataConfig(),
- dataWriteConfig.getBasePath(),
dataWriteConfig.getSpillableMapBasePath());
- this.metadataMetaClient = metadata.getMetadataMetaClient();
- } catch (Exception e) {
- throw new HoodieException("Error initializing metadata table for reads",
e);
- }
- }
-
/**
* Initialize the metadata table if needed.
*
* @param dataMetaClient - meta client for the data table
* @param actionMetadata - optional action metadata
* @param inflightInstantTimestamp - timestamp of an instant in progress on
the dataset
* @param <T> - action metadata types extending Avro
generated SpecificRecordBase
- * @throws IOException
+ * @throws IOException on errors
*/
- protected <T extends SpecificRecordBase> void
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
- Option<T>
actionMetadata,
-
Option<String> inflightInstantTimestamp) throws IOException {
+ protected <T extends SpecificRecordBase> boolean
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+
Option<T> actionMetadata,
+
Option<String> inflightInstantTimestamp) throws IOException {
HoodieTimer timer = HoodieTimer.start();
+ List<MetadataPartitionType> partitionsToInit = new
ArrayList<>(MetadataPartitionType.values().length);
- boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+ try {
+ boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+ if (!exists) {
+ // FILES partition is always required
+ partitionsToInit.add(MetadataPartitionType.FILES);
+ }
- if (!exists) {
- // Initialize for the first time by listing partitions and files
directly from the file system
- if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+ // check if any of the enabled partition types needs to be initialized
+ // NOTE: It needs to be guarded by async index config because if that is
enabled then initialization happens through the index scheduler.
+ if (!dataWriteConfig.isMetadataAsyncIndex()) {
+ Set<String> inflightAndCompletedPartitions =
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
+ LOG.info("Async metadata indexing disabled and following partitions
already initialized: " + inflightAndCompletedPartitions);
+ this.enabledPartitionTypes.stream()
+ .filter(p ->
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) &&
!MetadataPartitionType.FILES.equals(p))
+ .forEach(partitionsToInit::add);
}
- return;
- }
- // if metadata table exists, then check if any of the enabled partition
types needs to be initialized
- // NOTE: It needs to be guarded by async index config because if that is
enabled then initialization happens through the index scheduler.
- if (!dataWriteConfig.isMetadataAsyncIndex()) {
- Set<String> inflightAndCompletedPartitions =
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
- LOG.info("Async metadata indexing enabled and following partitions
already initialized: " + inflightAndCompletedPartitions);
- List<MetadataPartitionType> partitionsToInit =
this.enabledPartitionTypes.stream()
- .filter(p ->
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) &&
!MetadataPartitionType.FILES.equals(p))
- .collect(Collectors.toList());
- // if there are no partitions to initialize or there is a pending
operation, then don't initialize in this round
- if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient,
inflightInstantTimestamp)) {
- return;
+ if (partitionsToInit.isEmpty()) {
+ // No partitions to initialize
+ initMetadataReader();
+ return true;
+ }
+
+ // If there is no commit on the dataset yet, use the
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
+ // Otherwise, we use the timestamp of the latest completed action.
+ String initializationTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+
+ // Initialize partitions for the first time using data from the files on
the file system
+ if (!initializeFromFilesystem(initializationTime, partitionsToInit,
inflightInstantTimestamp)) {
+ LOG.error("Failed to initialize MDT from filesystem");
+ return false;
}
- String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
- initTableMetadata(); // re-init certain flags in BaseTableMetadata
- initializeEnabledFileGroups(dataMetaClient, createInstantTime,
partitionsToInit);
- initialCommit(createInstantTime, partitionsToInit);
- updateInitializedPartitionsInTableConfig(partitionsToInit);
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+ return true;
+ } catch (IOException e) {
+ LOG.error("Failed to initialize metadata table. Disabling the writer.",
e);
+ return false;
}
}
private <T extends SpecificRecordBase> boolean
metadataTableExists(HoodieTableMetaClient dataMetaClient,
Option<T>
actionMetadata) throws IOException {
- boolean exists = dataMetaClient.getFs().exists(new
Path(metadataWriteConfig.getBasePath(),
- HoodieTableMetaClient.METAFOLDER_NAME));
+ boolean exists = dataMetaClient.getTableConfig().isMetadataTableEnabled();
boolean reInitialize = false;
// If the un-synced instants have been archived, then
// the metadata table will need to be initialized again.
if (exists) {
- HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf.get())
- .setBasePath(metadataWriteConfig.getBasePath()).build();
-
- if (DEFAULT_METADATA_POPULATE_META_FIELDS !=
metadataMetaClient.getTableConfig().populateMetaFields()) {
- LOG.info("Re-initiating metadata table properties since populate meta
fields have changed");
- metadataMetaClient =
initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
+ try {
+ metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+ if (DEFAULT_METADATA_POPULATE_META_FIELDS !=
metadataMetaClient.getTableConfig().populateMetaFields()) {
+ LOG.info("Re-initiating metadata table properties since populate
meta fields have changed");
+ metadataMetaClient = initializeMetaClient();
+ }
+ } catch (TableNotFoundException e) {
+ // Table not found, initialize the metadata table.
+ metadataMetaClient = initializeMetaClient();
Review Comment:
+1
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java:
##########
@@ -135,8 +138,13 @@ protected byte[] serializeRecords(List<HoodieRecord>
records) throws IOException
}
final byte[] recordBytes = serializeRecord(record, writerSchema);
- ValidationUtils.checkState(!sortedRecordsMap.containsKey(recordKey),
- "Writing multiple records with same key not supported for " +
this.getClass().getName());
+ if (sortedRecordsMap.containsKey(recordKey)) {
+ LOG.error("Found duplicate record with recordKey: " + recordKey);
Review Comment:
I feel, we should test how hfile write and read works w/ duplicate records.
If it will throw incase of duplicate records, why can't we let it through so
that we don't incur additional latency to check for dups.
##########
hudi-common/src/main/avro/HoodieMetadata.avsc:
##########
@@ -358,6 +358,46 @@
}
],
"default" : null
+ },
+ {
+ "name": "recordIndexMetadata",
+ "doc": "Metadata Index that contains information about record keys
and their location in the dataset",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "HoodieRecordIndexInfo",
+ "fields": [
+ {
+ "name": "partition",
+ "type": "string",
+ "doc": "Partition which contains the record",
+ "avro.java.string": "String"
Review Comment:
for non-partitioned, value for this field will be empty string?
bcoz, I see every field here is non-nullable except the recordIndexMetadata
root field. Just wanted to confirm we are good.
##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java:
##########
@@ -126,6 +131,13 @@ public boolean canWrite() {
@Override
public void writeAvro(String recordKey, IndexedRecord record) throws
IOException {
+ if (!this.hfileConfig.allowDuplicatesToBeInserted()) {
Review Comment:
do we have tests for these?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -222,32 +192,30 @@ public Map<Pair<String, String>, BloomFilter>
getBloomFilters(final List<Pair<St
);
List<String> partitionIDFileIDStrings = new
ArrayList<>(partitionIDFileIDSortedStrings);
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
hoodieRecordList =
+ Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
getRecordsByKeys(partitionIDFileIDStrings,
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR,
(timer.endTimer() / partitionIDFileIDStrings.size())));
- Map<Pair<String, String>, BloomFilter> partitionFileToBloomFilterMap = new
HashMap<>();
- for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry
: hoodieRecordList) {
- if (entry.getRight().isPresent()) {
- final Option<HoodieMetadataBloomFilter> bloomFilterMetadata =
- entry.getRight().get().getData().getBloomFilterMetadata();
- if (bloomFilterMetadata.isPresent()) {
- if (!bloomFilterMetadata.get().getIsDeleted()) {
-
ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft()));
- // NOTE: We have to duplicate the [[ByteBuffer]] object here since:
- // - Reading out [[ByteBuffer]] mutates its state
- // - [[BloomFilterMetadata]] could be re-used, and hence
have to stay immutable
- final ByteBuffer bloomFilterByteBuffer =
- bloomFilterMetadata.get().getBloomFilter().duplicate();
- final String bloomFilterType = bloomFilterMetadata.get().getType();
- final BloomFilter bloomFilter = BloomFilterFactory.fromString(
-
StandardCharsets.UTF_8.decode(bloomFilterByteBuffer).toString(),
bloomFilterType);
-
partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()),
bloomFilter);
- }
- } else {
- LOG.error("Meta index bloom filter missing for: " +
fileToKeyMap.get(entry.getLeft()));
+ Map<Pair<String, String>, BloomFilter> partitionFileToBloomFilterMap = new
HashMap<>(hoodieRecords.size());
+ for (final Map.Entry<String, HoodieRecord<HoodieMetadataPayload>> entry :
hoodieRecords.entrySet()) {
Review Comment:
ok, looks like we dont do anything if no value is found. so, we should be
good here. but something to remember
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -275,30 +243,51 @@ public Map<Pair<String, String>,
HoodieMetadataColumnStats> getColumnStats(final
List<String> columnStatKeys = new ArrayList<>(sortedKeys);
HoodieTimer timer = HoodieTimer.start();
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
hoodieRecordList =
+ Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
Review Comment:
we should avoid sorting at L234 to 241. getRecordsByKeys() anyways sorts all
keys before any look up.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -143,46 +126,33 @@ public List<String> getAllPartitionPaths() throws
IOException {
* @param partitionPath The absolute path of the partition to list
*/
@Override
- public FileStatus[] getAllFilesInPartition(Path partitionPath)
- throws IOException {
- if (isMetadataTableEnabled) {
- try {
- return fetchAllFilesInPartition(partitionPath);
- } catch (Exception e) {
- throw new HoodieMetadataException("Failed to retrieve files in
partition " + partitionPath + " from metadata", e);
- }
+ public FileStatus[] getAllFilesInPartition(Path partitionPath) throws
IOException {
+ ValidationUtils.checkArgument(isMetadataTableInitialized);
+ try {
+ return fetchAllFilesInPartition(partitionPath);
+ } catch (Exception e) {
+ throw new HoodieMetadataException("Failed to retrieve files in partition
" + partitionPath + " from metadata", e);
}
-
- FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
Review Comment:
Did we chase done all reader code paths on this. for eg,
HoodieBackedTableMetadata could be used by readers even when mdt is not enabled
or fully formed. So, these serve as fallbacks. we should ensure all readers use
FSBasedTableMetadata if mdt is not enabled before removing these.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -193,121 +190,126 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(L
return Collections.emptyIterator();
}
- boolean fullKeys = false;
+ boolean fullKeys = false;
- Map<String, Option<HoodieRecord<HoodieMetadataPayload>>>
logRecords =
- readLogRecords(logRecordScanner, sortedKeyPrefixes,
fullKeys, timings);
+ Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords =
+ readLogRecords(logRecordScanner, sortedKeyPrefixes,
fullKeys, timings);
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
mergedRecords =
- readFromBaseAndMergeWithLogRecords(baseFileReader,
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
+ Map<String, HoodieRecord<HoodieMetadataPayload>> mergedRecords
=
+ readFromBaseAndMergeWithLogRecords(baseFileReader,
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
- LOG.debug(String.format("Metadata read for %s keys took
[baseFileRead, logMerge] %s ms",
- sortedKeyPrefixes.size(), timings));
+ LOG.debug(String.format("Metadata read for %s keys took
[baseFileRead, logMerge] %s ms",
+ sortedKeyPrefixes.size(), timings));
- return mergedRecords.stream()
- .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
- .filter(Objects::nonNull)
- .iterator();
- } catch (IOException ioe) {
- throw new HoodieIOException("Error merging records from metadata
table for " + sortedKeyPrefixes.size() + " key : ", ioe);
- } finally {
- closeReader(readers);
- }
- });
+ return mergedRecords.values().iterator();
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Error merging records from
metadata table for " + sortedKeyPrefixes.size() + " key : ", ioe);
+ } finally {
+ closeReader(readers);
+ }
+ });
}
@Override
- public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
getRecordsByKeys(List<String> keys,
-
String partitionName) {
- // Sort the columns so that keys are looked up in order
- List<String> sortedKeys = new ArrayList<>(keys);
- Collections.sort(sortedKeys);
- Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap =
getPartitionFileSliceToKeysMapping(partitionName, sortedKeys);
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result =
new ArrayList<>();
- AtomicInteger fileSlicesKeysCount = new AtomicInteger();
- partitionFileSliceToKeysMap.forEach((partitionFileSlicePair,
fileSliceKeys) -> {
- Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
- getOrCreateReaders(partitionName, partitionFileSlicePair.getRight());
- try {
- List<Long> timings = new ArrayList<>();
- HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
- HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
- if (baseFileReader == null && logRecordScanner == null) {
- return;
- }
+ protected Map<String, HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeys(List<String> keys, String partitionName) {
+ if (keys.isEmpty()) {
+ return Collections.emptyMap();
+ }
- boolean fullKeys = true;
- Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
- readLogRecords(logRecordScanner, fileSliceKeys, fullKeys, timings);
-
- result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader,
fileSliceKeys, fullKeys, logRecords,
- timings, partitionName));
-
- LOG.debug(String.format("Metadata read for %s keys took [baseFileRead,
logMerge] %s ms",
- fileSliceKeys.size(), timings));
- fileSlicesKeysCount.addAndGet(fileSliceKeys.size());
- } catch (IOException ioe) {
- throw new HoodieIOException("Error merging records from metadata table
for " + sortedKeys.size() + " key : ", ioe);
- } finally {
- if (!reuse) {
- closeReader(readers);
- }
+ Map<String, HoodieRecord<HoodieMetadataPayload>> result;
+
Review Comment:
I see we were sorting the keys before and now have removed it. Can we keep
the sorting here. And remove from every other flow. entire metadata is based on
hfile and it works on sorted entries. so make sense to sort the keys here
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -222,32 +192,30 @@ public Map<Pair<String, String>, BloomFilter>
getBloomFilters(final List<Pair<St
);
List<String> partitionIDFileIDStrings = new
ArrayList<>(partitionIDFileIDSortedStrings);
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
hoodieRecordList =
+ Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
getRecordsByKeys(partitionIDFileIDStrings,
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR,
(timer.endTimer() / partitionIDFileIDStrings.size())));
- Map<Pair<String, String>, BloomFilter> partitionFileToBloomFilterMap = new
HashMap<>();
- for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry
: hoodieRecordList) {
- if (entry.getRight().isPresent()) {
- final Option<HoodieMetadataBloomFilter> bloomFilterMetadata =
- entry.getRight().get().getData().getBloomFilterMetadata();
- if (bloomFilterMetadata.isPresent()) {
- if (!bloomFilterMetadata.get().getIsDeleted()) {
-
ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft()));
- // NOTE: We have to duplicate the [[ByteBuffer]] object here since:
- // - Reading out [[ByteBuffer]] mutates its state
- // - [[BloomFilterMetadata]] could be re-used, and hence
have to stay immutable
- final ByteBuffer bloomFilterByteBuffer =
- bloomFilterMetadata.get().getBloomFilter().duplicate();
- final String bloomFilterType = bloomFilterMetadata.get().getType();
- final BloomFilter bloomFilter = BloomFilterFactory.fromString(
-
StandardCharsets.UTF_8.decode(bloomFilterByteBuffer).toString(),
bloomFilterType);
-
partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()),
bloomFilter);
- }
- } else {
- LOG.error("Meta index bloom filter missing for: " +
fileToKeyMap.get(entry.getLeft()));
+ Map<Pair<String, String>, BloomFilter> partitionFileToBloomFilterMap = new
HashMap<>(hoodieRecords.size());
+ for (final Map.Entry<String, HoodieRecord<HoodieMetadataPayload>> entry :
hoodieRecords.entrySet()) {
Review Comment:
we should go through incoming list here i.e. partitionNameFileNameList.
bcoz, due to the perf fix we made, getRecordsByKeys() will only return valid
entries. if some key is not found, it may not return any value. so, callers
have to account for that.
in RLI, thats why we do left outer join w/ incoming records
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java:
##########
@@ -121,7 +123,7 @@ public HoodieCleanMetadata doClean(String commitTime,
Map<String, Integer> parti
public HoodieTestTable addCompaction(String instantTime,
HoodieCommitMetadata commitMetadata) throws Exception {
super.addCompaction(instantTime, commitMetadata);
if (writer != null) {
- writer.update(commitMetadata, instantTime);
+ writer.update(commitMetadata,
HoodieListData.eager(Collections.EMPTY_LIST), instantTime);
Review Comment:
all these needs to be fixed
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -123,6 +124,9 @@ public static HoodieWriteConfig createMetadataWriteConfig(
.withAllowMultiWriteOnSameInstant(true)
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
.withPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS)
+ .withStorageConfig(HoodieStorageConfig.newBuilder()
+
.hfileWriterToAllowDuplicates(writeConfig.hfileWriterToAllowDuplicates())
Review Comment:
curious to understand on which flow we allow duplicates to get written to
hfile. also, to my knowledge, I don't think our readers (full scan or
on-demand) can handle duplicates in a hfile.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -96,22 +93,24 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
private final Transient<Map<Pair<String, String>,
Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader>>>
partitionReaders =
Transient.lazy(ConcurrentHashMap::new);
- public HoodieBackedTableMetadata(HoodieEngineContext engineContext,
HoodieMetadataConfig metadataConfig,
- String datasetBasePath, String
spillableMapDirectory) {
- this(engineContext, metadataConfig, datasetBasePath,
spillableMapDirectory, false);
+ // Latest file slices in the metadata partitions
+ private final Map<String, List<FileSlice>> partitionFileSliceMap = new
ConcurrentHashMap<>();
Review Comment:
lets ensure we clean this up in reset() and close() . esply reset, if not
could lead to stale data being served from timeline server.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -193,121 +190,126 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(L
return Collections.emptyIterator();
}
- boolean fullKeys = false;
+ boolean fullKeys = false;
- Map<String, Option<HoodieRecord<HoodieMetadataPayload>>>
logRecords =
- readLogRecords(logRecordScanner, sortedKeyPrefixes,
fullKeys, timings);
+ Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords =
+ readLogRecords(logRecordScanner, sortedKeyPrefixes,
fullKeys, timings);
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
mergedRecords =
- readFromBaseAndMergeWithLogRecords(baseFileReader,
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
+ Map<String, HoodieRecord<HoodieMetadataPayload>> mergedRecords
=
+ readFromBaseAndMergeWithLogRecords(baseFileReader,
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
- LOG.debug(String.format("Metadata read for %s keys took
[baseFileRead, logMerge] %s ms",
- sortedKeyPrefixes.size(), timings));
+ LOG.debug(String.format("Metadata read for %s keys took
[baseFileRead, logMerge] %s ms",
+ sortedKeyPrefixes.size(), timings));
- return mergedRecords.stream()
- .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
- .filter(Objects::nonNull)
- .iterator();
- } catch (IOException ioe) {
- throw new HoodieIOException("Error merging records from metadata
table for " + sortedKeyPrefixes.size() + " key : ", ioe);
- } finally {
- closeReader(readers);
- }
- });
+ return mergedRecords.values().iterator();
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Error merging records from
metadata table for " + sortedKeyPrefixes.size() + " key : ", ioe);
+ } finally {
+ closeReader(readers);
+ }
+ });
}
@Override
- public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
getRecordsByKeys(List<String> keys,
-
String partitionName) {
- // Sort the columns so that keys are looked up in order
- List<String> sortedKeys = new ArrayList<>(keys);
- Collections.sort(sortedKeys);
- Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap =
getPartitionFileSliceToKeysMapping(partitionName, sortedKeys);
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result =
new ArrayList<>();
- AtomicInteger fileSlicesKeysCount = new AtomicInteger();
- partitionFileSliceToKeysMap.forEach((partitionFileSlicePair,
fileSliceKeys) -> {
- Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
- getOrCreateReaders(partitionName, partitionFileSlicePair.getRight());
- try {
- List<Long> timings = new ArrayList<>();
- HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
- HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
- if (baseFileReader == null && logRecordScanner == null) {
- return;
- }
+ protected Map<String, HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeys(List<String> keys, String partitionName) {
+ if (keys.isEmpty()) {
+ return Collections.emptyMap();
+ }
- boolean fullKeys = true;
- Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
- readLogRecords(logRecordScanner, fileSliceKeys, fullKeys, timings);
-
- result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader,
fileSliceKeys, fullKeys, logRecords,
- timings, partitionName));
-
- LOG.debug(String.format("Metadata read for %s keys took [baseFileRead,
logMerge] %s ms",
- fileSliceKeys.size(), timings));
- fileSlicesKeysCount.addAndGet(fileSliceKeys.size());
- } catch (IOException ioe) {
- throw new HoodieIOException("Error merging records from metadata table
for " + sortedKeys.size() + " key : ", ioe);
- } finally {
- if (!reuse) {
- closeReader(readers);
- }
+ Map<String, HoodieRecord<HoodieMetadataPayload>> result;
+
+ // Load the file slices for the partition. Each file slice is a shard
which saves a portion of the keys.
+ List<FileSlice> partitionFileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
+ k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
metadataFileSystemView, partitionName));
+ final int numFileSlices = partitionFileSlices.size();
+ ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for
partition " + partitionName + " should be > 0");
+
+ // Lookup keys from each file slice
+ if (numFileSlices == 1) {
+ // Optimization for a single slice for smaller metadata table partitions
+ result = lookupKeysFromFileSlice(partitionName, keys,
partitionFileSlices.get(0));
+ } else {
+ // Parallel lookup for large sized partitions with many file slices
+ // Partition the keys by the file slice which contains it
+ ArrayList<ArrayList<String>> partitionedKeys = new
ArrayList<>(numFileSlices);
+ for (int i = 0; i < numFileSlices; ++i) {
+ partitionedKeys.add(new ArrayList<>());
}
- });
+ keys.forEach(key -> {
+ int shardIndex =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, numFileSlices);
Review Comment:
not in this patch. but already the driver (in case of RLI based index) maps
keys to file groups based on hash and calls mapPartitions. So, if we pass on
the file group id along w/ getRecordsByKeys(), we can avoid this additional
mapping calls. atleast w/ RLI, each spark task is going to look up only in one
file slice.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -296,23 +324,6 @@ public static HoodieRecord<HoodieMetadataPayload>
createPartitionListRecord(List
return new HoodieAvroRecord<>(key, payload);
}
- /**
- * Create and return a {@code HoodieMetadataPayload} to save list of
partitions.
- *
- * @param partitionsAdded The list of added partitions
- * @param partitionsDeleted The list of deleted partitions
- */
- public static HoodieRecord<HoodieMetadataPayload>
createPartitionListRecord(List<String> partitionsAdded, List<String>
partitionsDeleted) {
Review Comment:
was this never used at all ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -241,105 +221,91 @@ public List<MetadataPartitionType>
getEnabledPartitionTypes() {
return this.enabledPartitionTypes;
}
- /**
- * Initialize the metadata table if it does not exist.
- * <p>
- * If the metadata table does not exist, then file and partition listing is
used to initialize the table.
- *
- * @param engineContext
- * @param actionMetadata Action metadata types extending Avro
generated SpecificRecordBase
- * @param inflightInstantTimestamp Timestamp of an instant in progress on
the dataset. This instant is ignored
- * while deciding to initialize the metadata
table.
- */
- protected abstract <T extends SpecificRecordBase> void
initialize(HoodieEngineContext engineContext,
- Option<T>
actionMetadata,
-
Option<String> inflightInstantTimestamp);
-
- public void initTableMetadata() {
- try {
- if (this.metadata != null) {
- this.metadata.close();
- }
- this.metadata = new HoodieBackedTableMetadata(engineContext,
dataWriteConfig.getMetadataConfig(),
- dataWriteConfig.getBasePath(),
dataWriteConfig.getSpillableMapBasePath());
- this.metadataMetaClient = metadata.getMetadataMetaClient();
- } catch (Exception e) {
- throw new HoodieException("Error initializing metadata table for reads",
e);
- }
- }
-
/**
* Initialize the metadata table if needed.
*
* @param dataMetaClient - meta client for the data table
* @param actionMetadata - optional action metadata
* @param inflightInstantTimestamp - timestamp of an instant in progress on
the dataset
* @param <T> - action metadata types extending Avro
generated SpecificRecordBase
- * @throws IOException
+ * @throws IOException on errors
*/
- protected <T extends SpecificRecordBase> void
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
- Option<T>
actionMetadata,
-
Option<String> inflightInstantTimestamp) throws IOException {
+ protected <T extends SpecificRecordBase> boolean
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+
Option<T> actionMetadata,
+
Option<String> inflightInstantTimestamp) throws IOException {
HoodieTimer timer = HoodieTimer.start();
+ List<MetadataPartitionType> partitionsToInit = new
ArrayList<>(MetadataPartitionType.values().length);
- boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+ try {
+ boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+ if (!exists) {
+ // FILES partition is always required
+ partitionsToInit.add(MetadataPartitionType.FILES);
+ }
- if (!exists) {
- // Initialize for the first time by listing partitions and files
directly from the file system
- if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+ // check if any of the enabled partition types needs to be initialized
+ // NOTE: It needs to be guarded by async index config because if that is
enabled then initialization happens through the index scheduler.
+ if (!dataWriteConfig.isMetadataAsyncIndex()) {
+ Set<String> inflightAndCompletedPartitions =
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
+ LOG.info("Async metadata indexing disabled and following partitions
already initialized: " + inflightAndCompletedPartitions);
+ this.enabledPartitionTypes.stream()
+ .filter(p ->
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) &&
!MetadataPartitionType.FILES.equals(p))
+ .forEach(partitionsToInit::add);
}
- return;
- }
- // if metadata table exists, then check if any of the enabled partition
types needs to be initialized
- // NOTE: It needs to be guarded by async index config because if that is
enabled then initialization happens through the index scheduler.
- if (!dataWriteConfig.isMetadataAsyncIndex()) {
- Set<String> inflightAndCompletedPartitions =
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
- LOG.info("Async metadata indexing enabled and following partitions
already initialized: " + inflightAndCompletedPartitions);
- List<MetadataPartitionType> partitionsToInit =
this.enabledPartitionTypes.stream()
- .filter(p ->
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) &&
!MetadataPartitionType.FILES.equals(p))
- .collect(Collectors.toList());
- // if there are no partitions to initialize or there is a pending
operation, then don't initialize in this round
- if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient,
inflightInstantTimestamp)) {
- return;
+ if (partitionsToInit.isEmpty()) {
+ // No partitions to initialize
+ initMetadataReader();
+ return true;
Review Comment:
guess it has to be true.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java:
##########
@@ -224,4 +232,9 @@ HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(List<Str
* Clear the states of the table metadata.
*/
void reset();
+
+ /**
+ * Returns the number of shards in a metadata table partition.
+ */
+ int getNumShards(MetadataPartitionType partition);
Review Comment:
shards is a new terminology for hudi in general. For an engineer who comes
from data base or data infra background might knew what a shard is. But can we
use "getNumFileGroups" so that its in line with our configs like min file
groups, max file groups etc.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1000,87 +1136,78 @@ protected void cleanIfNecessary(BaseHoodieWriteClient
writeClient, String instan
// Trigger cleaning with suffixes based on the same instant time. This
ensures that any future
// delta commits synced over will not have an instant time lesser than the
last completed instant on the
// metadata table.
- writeClient.clean(instantTime + "002");
+
writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime));
writeClient.lazyRollbackFailedIndexing();
}
/**
- * This is invoked to initialize metadata table for a dataset.
- * Initial commit has special handling mechanism due to its scale compared
to other regular commits.
- * During cold startup, the list of files to be committed can be huge.
- * So creating a HoodieCommitMetadata out of these large number of files,
- * and calling the existing update(HoodieCommitMetadata) function does not
scale well.
- * Hence, we have a special commit just for the initialization scenario.
+ * Validates the timeline for both main and metadata tables to ensure
compaction on MDT can be scheduled.
*/
- private void initialCommit(String createInstantTime,
List<MetadataPartitionType> partitionTypes) {
- // List all partitions in the basePath of the containing dataset
- LOG.info("Initializing metadata table by using file listings in " +
dataWriteConfig.getBasePath());
- engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing
metadata table by listing files and partitions: " +
dataWriteConfig.getTableName());
-
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap
= new HashMap<>();
-
- // skip file system listing to populate metadata records if it's a fresh
table.
- // this is applicable only if the table already has N commits and metadata
is enabled at a later point in time.
- if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { //
SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
- // If not, last completed commit in data table will be chosen as the
initial commit time.
- LOG.info("Triggering empty Commit to metadata to initialize");
- } else {
- List<DirectoryInfo> partitionInfoList =
listAllPartitions(dataMetaClient);
- Map<String, Map<String, Long>> partitionToFilesMap =
partitionInfoList.stream()
- .map(p -> {
- String partitionName =
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
- return Pair.of(partitionName, p.getFileNameToSizeMap());
- })
- .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
- int totalDataFilesCount =
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
- List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
- if (partitionTypes.contains(MetadataPartitionType.FILES)) {
- // Record which saves the list of all partitions
- HoodieRecord allPartitionRecord =
HoodieMetadataPayload.createPartitionListRecord(partitions);
- HoodieData<HoodieRecord> filesPartitionRecords =
getFilesPartitionRecords(createInstantTime, partitionInfoList,
allPartitionRecord);
- ValidationUtils.checkState(filesPartitionRecords.count() ==
(partitions.size() + 1));
- partitionToRecordsMap.put(MetadataPartitionType.FILES,
filesPartitionRecords);
- }
-
- if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) &&
totalDataFilesCount > 0) {
- final HoodieData<HoodieRecord> recordsRDD =
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
- engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams(), createInstantTime);
- partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS,
recordsRDD);
- }
+ private boolean validateTimelineBeforeSchedulingCompaction(Option<String>
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
+ // we need to find if there are any inflights in data table timeline
before or equal to the latest delta commit in metadata table.
+ // Whenever you want to change this logic, please ensure all below
scenarios are considered.
+ // a. There could be a chance that latest delta commit in MDT is committed
in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
+ // b. There could be DT inflights after latest delta commit in MDT and we
are ok with it. bcoz, the contract is, latest compaction instant time in MDT
represents
+ // any instants before that is already synced with metadata table.
+ // c. Do consider out of order commits. For eg, c4 from DT could complete
before c3. and we can't trigger compaction in MDT with c4 as base instant time,
until every
+ // instant before c4 is synced with metadata table.
+ List<HoodieInstant> pendingInstants =
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
- if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) &&
totalDataFilesCount > 0) {
- final HoodieData<HoodieRecord> recordsRDD =
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
- engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams());
- partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS,
recordsRDD);
- }
- LOG.info("Committing " + partitions.size() + " partitions and " +
totalDataFilesCount + " files to metadata");
+ if (!pendingInstants.isEmpty()) {
+ checkNumDeltaCommits(metadataMetaClient,
dataWriteConfig.getMetadataConfig().getMaxNumDeltacommitsWhenPending());
+ LOG.info(String.format(
+ "Cannot compact metadata table as there are %d inflight instants in
data table before latest deltacommit in metadata table: %s. Inflight instants
in data table: %s",
+ pendingInstants.size(), latestDeltaCommitTimeInMetadataTable,
Arrays.toString(pendingInstants.toArray())));
+ return false;
}
- commit(createInstantTime, partitionToRecordsMap, false);
+ return true;
}
- private HoodieData<HoodieRecord> getFilesPartitionRecords(String
createInstantTime, List<DirectoryInfo> partitionInfoList, HoodieRecord
allPartitionRecord) {
- HoodieData<HoodieRecord> filesPartitionRecords =
engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
- if (partitionInfoList.isEmpty()) {
- return filesPartitionRecords;
- }
+ /**
+ * Return records that represent update to the record index due to write
operation on the dataset.
+ *
+ * @param writeStatuses (@code WriteStatus} from the write operation
+ */
+ private HoodieData<HoodieRecord>
getRecordIndexUpdates(HoodieData<WriteStatus> writeStatuses) {
+ return writeStatuses.flatMap(writeStatus -> {
+ List<HoodieRecord> recordList = new LinkedList<>();
+ for (HoodieRecord writtenRecord : writeStatus.getWrittenRecords()) {
+ if (!writeStatus.isErrored(writtenRecord.getKey())) {
+ HoodieRecord hoodieRecord;
+ HoodieKey key = writtenRecord.getKey();
+ Option<HoodieRecordLocation> newLocation =
writtenRecord.getNewLocation();
+ if (newLocation.isPresent()) {
+ if (writtenRecord.getCurrentLocation() != null) {
+ // This is an update, no need to update index if the location
has not changed
+ // newLocation should have the same fileID as currentLocation.
The instantTimes differ as newLocation's
+ // instantTime refers to the current commit which was completed.
+ if
(!writtenRecord.getCurrentLocation().getFileId().equals(newLocation.get().getFileId()))
{
+ final String msg = String.format("Detected update in location
of record with key %s from %s "
Review Comment:
we can take it up in a follow up patch
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1620,4 +1620,72 @@ public static String createIndexInitTimestamp(String
timestamp, int offset) {
public static String createLogCompactionTimestamp(String timestamp) {
return timestamp + LOG_COMPACTION_TIMESTAMP_SUFFIX;
}
+
+ /**
+ * Estimates the file group count to use for a MDT partition.
+ *
+ * @param partitionType Type of the partition for which the file
group count is to be estimated.
+ * @param recordCount The number of records expected to be written.
+ * @param averageRecordSize Average size of each record to be writen.
+ * @param minFileGroupCount Minimum number of file groups to use.
+ * @param maxFileGroupCount Maximum number of file groups to use.
+ * @param growthFactor By what factor are the records (recordCount)
expected to grow?
+ * @param maxFileGroupSizeBytes Maximum size of the file group.
+ * @return The estimated number of file groups.
+ */
+ public static int estimateFileGroupCount(MetadataPartitionType
partitionType, long recordCount, int averageRecordSize, int minFileGroupCount,
Review Comment:
do we have tests for this?
##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java:
##########
@@ -545,6 +545,33 @@ public void close() {
}
}
+ @Override
+ public ClosableIterator<String> getRecordKeyIterator() {
+ final HFileScanner scanner = reader.getScanner(false, false);
Review Comment:
NTS: this is not used for metadata table read.
we use getRecordsByKeysIterator()
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -111,18 +111,27 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
public static final String METADATA_COMPACTION_TIME_SUFFIX = "001";
+ // Virtual keys support for metadata table. This Field is
+ // from the metadata payload schema.
+ private static final String RECORD_KEY_FIELD_NAME =
HoodieMetadataPayload.KEY_FIELD_NAME;
+
+ // Average size of a record saved within the record index.
+ // Record index has a fixed size schema. This has been calculated based on
experiments with default settings
+ // for block size (4MB), compression (GZ) and disabling the hudi metadata
fields.
Review Comment:
is this comment addressed ?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.execution.PartitionIdPassthrough;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object>
{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+ // The index to fallback upon when record index is not initialized yet.
+ // This should be a global index like record index so that the behavior of
tagging across partitions is not changed.
+ private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE =
IndexType.GLOBAL_SIMPLE;
+
+ public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+ super(config);
+ }
+
+ @Override
+ public <R> HoodieData<HoodieRecord<R>>
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
+ int fileGroupSize;
+ try {
+
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+ fileGroupSize =
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(hoodieTable.getMetaClient(),
(HoodieTableFileSystemView) hoodieTable.getFileSystemView(),
+ MetadataPartitionType.RECORD_INDEX.getPartitionPath()).size();
+ ValidationUtils.checkState(fileGroupSize > 0, "Record index should have
at least one file group");
+ } catch (TableNotFoundException | IllegalStateException e) {
+ // This means that record index has not been initialized.
+ LOG.warn(String.format("Record index not initialized so falling back to
%s for tagging records", FALLBACK_INDEX_TYPE.name()));
+
+ // Fallback to another index so that tagLocation is still accurate and
there are no duplicates.
+ HoodieWriteConfig otherConfig =
HoodieWriteConfig.newBuilder().withProperties(config.getProps())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(FALLBACK_INDEX_TYPE).build()).build();
+ HoodieIndex fallbackIndex =
SparkHoodieIndexFactory.createIndex(otherConfig);
+
+ // Fallback index needs to be a global index like record index
+ ValidationUtils.checkArgument(fallbackIndex.isGlobal(), "Fallback index
needs to be a global index like record index");
+
+ return fallbackIndex.tagLocation(records, context, hoodieTable);
+ }
+
+ // final variable required for lamda functions below
+ final int numFileGroups = fileGroupSize;
+
+ // Partition the record keys to lookup such that each partition looks up
one record index shard
+ JavaRDD<String> partitionedKeyRDD = HoodieJavaRDD.getJavaRDD(records)
+ .map(HoodieRecord::getRecordKey)
+ .keyBy(k -> HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(k,
numFileGroups))
+ .partitionBy(new PartitionIdPassthrough(numFileGroups))
+ .map(t -> t._2);
+ ValidationUtils.checkState(partitionedKeyRDD.getNumPartitions() <=
numFileGroups);
+
+ // Lookup the keys in the record index
+ HoodiePairData<String, HoodieRecordGlobalLocation> keyToLocationPairRDD =
+ HoodieJavaPairRDD.of(partitionedKeyRDD.mapPartitionsToPair(new
RecordIndexFileGroupLookupFunction(hoodieTable)));
+
+ // Tag the incoming records, as inserts or updates, by joining with
existing record keys
+ HoodieData<HoodieRecord<R>> taggedRecords =
tagLocationBackToRecords(keyToLocationPairRDD, records);
+
+ // The number of partitions in the taggedRecords is expected to the
maximum of the partitions in
+ // keyToLocationPairRDD and records RDD.
+
+ return taggedRecords;
+ }
+
+ @Override
+ public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus>
writeStatuses, HoodieEngineContext context,
+ HoodieTable hoodieTable) {
+ // This is a no-op as metadata record index updates are automatically
maintained within the metadata table.
+ return writeStatuses;
+ }
+
+ @Override
+ public boolean rollbackCommit(String instantTime) {
+ // Only those deltacommits which have a valid completed commit on the
dataset are read. Since, the instantTime
+ // is being rolled back on the dataset, we will not load the records from
the deltacommit and it is virtually
+ // rolled back. In other words, there is no need to rollback any
deltacommit here except if the deltacommit
+ // was compacted and a new basefile has been created.
+ try {
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath()))
+ .setConf(new Configuration()).build();
+ HoodieTimeline commitTimeline =
metaClient.getCommitTimeline().filterCompletedInstants();
+ if (commitTimeline.empty()) {
+ // No compaction yet so no need to check for deltacommits due to the
logic above
+ return true;
+ }
+
+ if (HoodieTimeline.compareTimestamps(instantTime, GREATER_THAN,
commitTimeline.lastInstant().get().getTimestamp())) {
+ // After the last compaction so no rollback required as per logic above
+ return true;
+ }
+ LOG.warn("Cannot rollback instant " + instantTime + " because the
corresponding deltacommit has been compacted "
+ + " in " + commitTimeline.lastInstant().get().getTimestamp());
+ return false;
+ } catch (TableNotFoundException e) {
+ // Metadata table is not setup. Nothing to rollback. Exit gracefully.
+ LOG.warn("Cannot rollback instant " + instantTime + " as metadata table
is not found");
+ return true;
+ }
+ }
+
+ @Override
+ public boolean isGlobal() {
+ return true;
+ }
+
+ @Override
+ public boolean canIndexLogFiles() {
+ return true;
+ }
+
+ @Override
+ public boolean isImplicitWithStorage() {
+ return false;
+ }
+
+ private <R> HoodieData<HoodieRecord<R>> tagLocationBackToRecords(
+ HoodiePairData<String, HoodieRecordGlobalLocation> keyFilenamePair,
+ HoodieData<HoodieRecord<R>> records) {
+ HoodiePairData<String, HoodieRecord<R>> keyRecordPairs =
+ records.mapToPair(record -> new ImmutablePair<>(record.getRecordKey(),
record));
+ // Here as the records might have more data than keyFilenamePairs (some
row keys' not found in record index),
+ // we will do left outer join.
+ return keyRecordPairs.leftOuterJoin(keyFilenamePair).values()
+ .map(v -> {
+ HoodieRecord<R> record = v.getLeft();
+ Option<HoodieRecordGlobalLocation> location =
Option.ofNullable(v.getRight().orElse(null));
+ if (!location.isPresent()) {
+ // No location found.
+ return record;
+ }
+ // Ensure the partitionPath is also set correctly in the key
+ if
(!record.getPartitionPath().equals(location.get().getPartitionPath())) {
+ record = new HoodieAvroRecord(new HoodieKey(record.getRecordKey(),
location.get().getPartitionPath()), (HoodieRecordPayload) record.getData());
+ }
+
+ // Perform the tagging. Not using HoodieIndexUtils.getTaggedRecord
to prevent an additional copy which is not necessary for this index.
+ record.unseal();
+ record.setCurrentLocation(location.get());
+ record.seal();
+ return record;
+ });
+ }
+
+ /**
+ * Function that lookups a list of keys in a single shard of the record index
+ */
+ private static class RecordIndexFileGroupLookupFunction implements
PairFlatMapFunction<Iterator<String>, String, HoodieRecordGlobalLocation> {
+ private final HoodieTable hoodieTable;
+
+ public RecordIndexFileGroupLookupFunction(HoodieTable hoodieTable) {
+ this.hoodieTable = hoodieTable;
+ }
+
+ @Override
+ public Iterator<Tuple2<String, HoodieRecordGlobalLocation>>
call(Iterator<String> recordKeyIterator) {
+ List<String> keysToLookup = new ArrayList<>();
+ recordKeyIterator.forEachRemaining(keysToLookup::add);
+
+ // recordIndexInfo object only contains records that are present in
record_index.
+ Map<String, HoodieRecordGlobalLocation> recordIndexInfo =
hoodieTable.getMetadataTable().readRecordIndex(keysToLookup);
+
+ HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+ HoodieTimeline commitsTimeline =
metaClient.getCommitsTimeline().filterCompletedInstants();
+ return recordIndexInfo.entrySet().stream()
+ .filter(e -> HoodieIndexUtils.checkIfValidCommit(commitsTimeline,
e.getValue().getInstantTime()))
Review Comment:
checking if valid commit might be costly since we are doing it for every
record. so, trying to see if we can avoid it
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.execution.PartitionIdPassthrough;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object>
{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+ // The index to fallback upon when record index is not initialized yet.
+ // This should be a global index like record index so that the behavior of
tagging across partitions is not changed.
+ private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE =
IndexType.GLOBAL_SIMPLE;
+
+ public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+ super(config);
+ }
+
+ @Override
+ public <R> HoodieData<HoodieRecord<R>>
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
+ int fileGroupSize;
+ try {
+
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+ fileGroupSize =
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(hoodieTable.getMetaClient(),
(HoodieTableFileSystemView) hoodieTable.getFileSystemView(),
+ MetadataPartitionType.RECORD_INDEX.getPartitionPath()).size();
+ ValidationUtils.checkState(fileGroupSize > 0, "Record index should have
at least one file group");
+ } catch (TableNotFoundException | IllegalStateException e) {
+ // This means that record index has not been initialized.
+ LOG.warn(String.format("Record index not initialized so falling back to
%s for tagging records", FALLBACK_INDEX_TYPE.name()));
+
+ // Fallback to another index so that tagLocation is still accurate and
there are no duplicates.
+ HoodieWriteConfig otherConfig =
HoodieWriteConfig.newBuilder().withProperties(config.getProps())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(FALLBACK_INDEX_TYPE).build()).build();
+ HoodieIndex fallbackIndex =
SparkHoodieIndexFactory.createIndex(otherConfig);
+
+ // Fallback index needs to be a global index like record index
+ ValidationUtils.checkArgument(fallbackIndex.isGlobal(), "Fallback index
needs to be a global index like record index");
+
+ return fallbackIndex.tagLocation(records, context, hoodieTable);
+ }
+
+ // final variable required for lamda functions below
+ final int numFileGroups = fileGroupSize;
+
+ // Partition the record keys to lookup such that each partition looks up
one record index shard
+ JavaRDD<String> partitionedKeyRDD = HoodieJavaRDD.getJavaRDD(records)
+ .map(HoodieRecord::getRecordKey)
+ .keyBy(k -> HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(k,
numFileGroups))
+ .partitionBy(new PartitionIdPassthrough(numFileGroups))
+ .map(t -> t._2);
+ ValidationUtils.checkState(partitionedKeyRDD.getNumPartitions() <=
numFileGroups);
+
+ // Lookup the keys in the record index
+ HoodiePairData<String, HoodieRecordGlobalLocation> keyToLocationPairRDD =
+ HoodieJavaPairRDD.of(partitionedKeyRDD.mapPartitionsToPair(new
RecordIndexFileGroupLookupFunction(hoodieTable)));
+
+ // Tag the incoming records, as inserts or updates, by joining with
existing record keys
+ HoodieData<HoodieRecord<R>> taggedRecords =
tagLocationBackToRecords(keyToLocationPairRDD, records);
+
+ // The number of partitions in the taggedRecords is expected to the
maximum of the partitions in
+ // keyToLocationPairRDD and records RDD.
+
+ return taggedRecords;
+ }
+
+ @Override
+ public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus>
writeStatuses, HoodieEngineContext context,
+ HoodieTable hoodieTable) {
+ // This is a no-op as metadata record index updates are automatically
maintained within the metadata table.
+ return writeStatuses;
+ }
+
+ @Override
+ public boolean rollbackCommit(String instantTime) {
+ // Only those deltacommits which have a valid completed commit on the
dataset are read. Since, the instantTime
+ // is being rolled back on the dataset, we will not load the records from
the deltacommit and it is virtually
+ // rolled back. In other words, there is no need to rollback any
deltacommit here except if the deltacommit
+ // was compacted and a new basefile has been created.
+ try {
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath()))
+ .setConf(new Configuration()).build();
+ HoodieTimeline commitTimeline =
metaClient.getCommitTimeline().filterCompletedInstants();
+ if (commitTimeline.empty()) {
+ // No compaction yet so no need to check for deltacommits due to the
logic above
+ return true;
+ }
+
+ if (HoodieTimeline.compareTimestamps(instantTime, GREATER_THAN,
commitTimeline.lastInstant().get().getTimestamp())) {
+ // After the last compaction so no rollback required as per logic above
+ return true;
+ }
+ LOG.warn("Cannot rollback instant " + instantTime + " because the
corresponding deltacommit has been compacted "
+ + " in " + commitTimeline.lastInstant().get().getTimestamp());
+ return false;
+ } catch (TableNotFoundException e) {
+ // Metadata table is not setup. Nothing to rollback. Exit gracefully.
+ LOG.warn("Cannot rollback instant " + instantTime + " as metadata table
is not found");
+ return true;
+ }
+ }
+
+ @Override
+ public boolean isGlobal() {
+ return true;
+ }
+
+ @Override
+ public boolean canIndexLogFiles() {
+ return true;
+ }
+
+ @Override
+ public boolean isImplicitWithStorage() {
+ return false;
+ }
+
+ private <R> HoodieData<HoodieRecord<R>> tagLocationBackToRecords(
+ HoodiePairData<String, HoodieRecordGlobalLocation> keyFilenamePair,
+ HoodieData<HoodieRecord<R>> records) {
+ HoodiePairData<String, HoodieRecord<R>> keyRecordPairs =
+ records.mapToPair(record -> new ImmutablePair<>(record.getRecordKey(),
record));
+ // Here as the records might have more data than keyFilenamePairs (some
row keys' not found in record index),
+ // we will do left outer join.
+ return keyRecordPairs.leftOuterJoin(keyFilenamePair).values()
+ .map(v -> {
+ HoodieRecord<R> record = v.getLeft();
+ Option<HoodieRecordGlobalLocation> location =
Option.ofNullable(v.getRight().orElse(null));
+ if (!location.isPresent()) {
+ // No location found.
+ return record;
+ }
+ // Ensure the partitionPath is also set correctly in the key
+ if
(!record.getPartitionPath().equals(location.get().getPartitionPath())) {
+ record = new HoodieAvroRecord(new HoodieKey(record.getRecordKey(),
location.get().getPartitionPath()), (HoodieRecordPayload) record.getData());
+ }
+
+ // Perform the tagging. Not using HoodieIndexUtils.getTaggedRecord
to prevent an additional copy which is not necessary for this index.
+ record.unseal();
+ record.setCurrentLocation(location.get());
+ record.seal();
+ return record;
+ });
+ }
+
+ /**
+ * Function that lookups a list of keys in a single shard of the record index
+ */
+ private static class RecordIndexFileGroupLookupFunction implements
PairFlatMapFunction<Iterator<String>, String, HoodieRecordGlobalLocation> {
+ private final HoodieTable hoodieTable;
+
+ public RecordIndexFileGroupLookupFunction(HoodieTable hoodieTable) {
+ this.hoodieTable = hoodieTable;
+ }
+
+ @Override
+ public Iterator<Tuple2<String, HoodieRecordGlobalLocation>>
call(Iterator<String> recordKeyIterator) {
+ List<String> keysToLookup = new ArrayList<>();
+ recordKeyIterator.forEachRemaining(keysToLookup::add);
+
+ // recordIndexInfo object only contains records that are present in
record_index.
+ Map<String, HoodieRecordGlobalLocation> recordIndexInfo =
hoodieTable.getMetadataTable().readRecordIndex(keysToLookup);
+
+ HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+ HoodieTimeline commitsTimeline =
metaClient.getCommitsTimeline().filterCompletedInstants();
+ return recordIndexInfo.entrySet().stream()
+ .filter(e -> HoodieIndexUtils.checkIfValidCommit(commitsTimeline,
e.getValue().getInstantTime()))
Review Comment:
may I know why do we need this. We already ensure we ignore non committed
records while reading log records based on valid instant ranges. So, interested
to know what are we trying to guard in addition here.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -494,14 +597,8 @@ private Pair<Integer, HoodieData<HoodieRecord>>
initializeFilesPartition(String
engineContext.setJobStatus(this.getClass().getSimpleName(), "Creating
records for MDT FILES partition");
HoodieData<HoodieRecord> fileListRecords =
engineContext.parallelize(partitionInfoList,
partitionInfoList.size()).map(partitionInfo -> {
Map<String, Long> fileNameToSizeMap =
partitionInfo.getFileNameToSizeMap();
- // filter for files that are part of the completed commits
- Map<String, Long> validFileNameToSizeMap =
fileNameToSizeMap.entrySet().stream().filter(fileSizePair -> {
Review Comment:
I see. we have moved it to L1309 ish
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -143,46 +126,33 @@ public List<String> getAllPartitionPaths() throws
IOException {
* @param partitionPath The absolute path of the partition to list
*/
@Override
- public FileStatus[] getAllFilesInPartition(Path partitionPath)
- throws IOException {
- if (isMetadataTableEnabled) {
- try {
- return fetchAllFilesInPartition(partitionPath);
- } catch (Exception e) {
- throw new HoodieMetadataException("Failed to retrieve files in
partition " + partitionPath + " from metadata", e);
- }
+ public FileStatus[] getAllFilesInPartition(Path partitionPath) throws
IOException {
+ ValidationUtils.checkArgument(isMetadataTableInitialized);
+ try {
+ return fetchAllFilesInPartition(partitionPath);
+ } catch (Exception e) {
+ throw new HoodieMetadataException("Failed to retrieve files in partition
" + partitionPath + " from metadata", e);
}
-
- FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
Review Comment:
for eg, for the first time when metadata is enabled on the wirte config, but
mdt table is yet to be initialized, the FS view used might be instantiating
HoodieBackedTableMetadata.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]