nsivabalan commented on a change in pull request #4693:
URL: https://github.com/apache/hudi/pull/4693#discussion_r827496666
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -659,20 +691,100 @@ private MetadataRecordsGenerationParams
getRecordsGenerationParams() {
/**
* Processes commit metadata from data table and commits to metadata table.
+ *
* @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the
respective metadata to List of HoodieRecords to be written to metadata table.
* @param <T> type of commit metadata.
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap
= convertMetadataFunction.convertMetadata();
- commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ try {
+ initializeFileGroups(dataMetaClient,
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to initialize
file groups for metadata partition: %s, instant: %s", p, instantTime));
+ }
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionRecordsMap = convertMetadataFunction.convertMetadata();
+ commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ }
+ });
+ }
+
+ private List<String> getMetadataPartitionsToUpdate() {
+ // find last (pending or) completed index instant and get partitions (to
be) written
+ Option<HoodieInstant> lastIndexingInstant =
dataMetaClient.getActiveTimeline()
Review comment:
guess we have to fix this to read from table Properties ?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -659,20 +691,100 @@ private MetadataRecordsGenerationParams
getRecordsGenerationParams() {
/**
* Processes commit metadata from data table and commits to metadata table.
+ *
* @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the
respective metadata to List of HoodieRecords to be written to metadata table.
* @param <T> type of commit metadata.
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap
= convertMetadataFunction.convertMetadata();
- commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ try {
+ initializeFileGroups(dataMetaClient,
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to initialize
file groups for metadata partition: %s, instant: %s", p, instantTime));
+ }
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionRecordsMap = convertMetadataFunction.convertMetadata();
+ commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ }
+ });
+ }
+
+ private List<String> getMetadataPartitionsToUpdate() {
+ // find last (pending or) completed index instant and get partitions (to
be) written
+ Option<HoodieInstant> lastIndexingInstant =
dataMetaClient.getActiveTimeline()
+
.getTimelineOfActions(CollectionUtils.createImmutableSet(HoodieTimeline.INDEX_ACTION)).lastInstant();
+ if (lastIndexingInstant.isPresent()) {
+ try {
+ // TODO: handle inflight instant, if it is inflight then read from
requested file.
+ HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+
dataMetaClient.getActiveTimeline().readIndexPlanAsBytes(lastIndexingInstant.get()).get());
+ return
indexPlan.getIndexPartitionInfos().stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList());
+ } catch (IOException e) {
+ LOG.warn("Could not read index plan. Falling back to
FileSystem.exists() check.");
+ return getExistingMetadataPartitions();
+ }
}
+ // TODO: return only enabled partitions
+ return MetadataPartitionType.allPaths();
Review comment:
why we return all partitions? what incase of the following:
1. if someone migrated to 0.11 from 0.10. but files partition was already
present.
2. (1) + added 1 new metadata partition and is inflight.
3. (2) + 1 new partition is completed.
can you help me understand what this method would return in all these cases.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+ private static final Logger LOG =
LogManager.getLogger(RunIndexActionExecutor.class);
+ private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION =
INDEX_COMMIT_METADATA_VERSION_1;
+ private static final int MAX_CONCURRENT_INDEXING = 1;
+
+ public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig
config, HoodieTable<T, I, K, O> table, String instantTime) {
+ super(context, config, table, instantTime);
+ }
+
+ @Override
+ public Option<HoodieIndexCommitMetadata> execute() {
+ HoodieTimer indexTimer = new HoodieTimer();
+ indexTimer.startTimer();
+
+ HoodieInstant indexInstant = table.getActiveTimeline()
+ .filterPendingIndexTimeline()
+ .filter(instant -> instant.getTimestamp().equals(instantTime))
+ .lastInstant()
+ .orElseThrow(() -> new HoodieIndexException(String.format("No pending
index instant found: %s", instantTime)));
+
ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+ String.format("Index instant %s already inflight", instantTime));
+ try {
+ // read HoodieIndexPlan assuming indexInstant is requested
+ // TODO: handle inflight instant, if it is inflight then throw error.
+ HoodieIndexPlan indexPlan =
TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+ List<HoodieIndexPartitionInfo> indexPartitionInfos =
indexPlan.getIndexPartitionInfos();
+ if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+ throw new HoodieIndexException(String.format("No partitions to index
for instant: %s", instantTime));
+ }
+ // transition requested indexInstant to inflight
+
table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant,
Option.empty());
+ // start indexing for each partition
+ HoodieTableMetadataWriter metadataWriter =
table.getMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format("Could not
get metadata writer to run index action for instant: %s", instantTime)));
+ metadataWriter.index(context, indexPartitionInfos);
+ // get all completed instants since the plan completed
+ // assumption is that all metadata partitions had same instant upto
which they were scheduled to be indexed
+ String indexUptoInstant =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ Stream<HoodieInstant> remainingInstantsToIndex =
table.getActiveTimeline().getWriteTimeline().getReverseOrderedInstants()
Review comment:
can we move the catch up indexing to a separate method
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -588,10 +609,87 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
- commit(engineContext.parallelize(records, 1),
MetadataPartitionType.FILES.partitionPath(), instantTime,
canTriggerTableService);
+ List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ try {
+ initializeFileGroups(dataMetaClient,
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to initialize
file groups for metadata partition: %s, instant: %s", p, instantTime));
+ }
+ List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
+ commit(engineContext.parallelize(records, 1), p, instantTime,
canTriggerTableService);
+ }
+ });
+ }
+
+ private List<String> getMetadataPartitionsToUpdate() {
+ // find last (pending or) completed index instant and get partitions (to
be) written
+ Option<HoodieInstant> lastIndexingInstant =
dataMetaClient.getActiveTimeline()
+
.getTimelineOfActions(CollectionUtils.createImmutableSet(HoodieTimeline.INDEX_ACTION)).lastInstant();
+ if (lastIndexingInstant.isPresent()) {
+ try {
+ // TODO: handle inflight instant, if it is inflight then read from
requested file.
+ HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+
dataMetaClient.getActiveTimeline().readIndexPlanAsBytes(lastIndexingInstant.get()).get());
+ return
indexPlan.getIndexPartitionInfos().stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList());
+ } catch (IOException e) {
+ LOG.warn("Could not read index plan. Falling back to
FileSystem.exists() check.");
+ return getExistingMetadataPartitions();
Review comment:
we can't fallback to fetching all partitions right. some could be
inflight and not fully completed wrt index building. or am I missing something
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -620,8 +636,14 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
LOG.info(String.format("Creating %d file groups for partition %s with base
fileId %s at instant time %s",
fileGroupCount, metadataPartition.getPartitionPath(),
metadataPartition.getFileIdPrefix(), instantTime));
+ HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemView(metadataMetaClient);
+ List<FileSlice> fileSlices =
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
Option.ofNullable(fsView), metadataPartition.getPartitionPath());
for (int i = 0; i < fileGroupCount; ++i) {
final String fileGroupFileId = String.format("%s%04d",
metadataPartition.getFileIdPrefix(), i);
+ // if a writer or async indexer had already initialized the filegroup
then continue
+ if (!fileSlices.isEmpty() && fileSlices.stream().anyMatch(fileSlice ->
fileGroupFileId.equals(fileSlice.getFileGroupId().getFileId()))) {
+ continue;
Review comment:
can you help me understand how does partially failed filegroup
instantiation is handled. Do we clean up all file groups and start from scratch
or do we continue from where we left ? I mean, if indexer restarts next time
around.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -659,20 +691,100 @@ private MetadataRecordsGenerationParams
getRecordsGenerationParams() {
/**
* Processes commit metadata from data table and commits to metadata table.
+ *
* @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the
respective metadata to List of HoodieRecords to be written to metadata table.
* @param <T> type of commit metadata.
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap
= convertMetadataFunction.convertMetadata();
- commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ try {
+ initializeFileGroups(dataMetaClient,
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
Review comment:
why are we initializing file groups here? if I am not wrong, this is
called in synchronous code path where data table is looking to apply a commit
to MDT. with async metadata indexing, wouldn't the scheduling takes
responsibility of initializing the file groups.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -659,20 +691,100 @@ private MetadataRecordsGenerationParams
getRecordsGenerationParams() {
/**
* Processes commit metadata from data table and commits to metadata table.
+ *
* @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the
respective metadata to List of HoodieRecords to be written to metadata table.
* @param <T> type of commit metadata.
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap
= convertMetadataFunction.convertMetadata();
- commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
Review comment:
how does this work for a table that migrated from 0.10.0 for eg. they
may not have added "files" partition to table properties right? i.e. list of
fully completed metadata partitions.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/bloom/BloomFilter.java
##########
@@ -30,6 +34,13 @@
*/
void add(String key);
+ /**
+ * Add secondary key to the {@link BloomFilter}.
+ *
+ * @param keys list of secondary keys to add to the {@link BloomFilter}
+ */
+ void add(@Nonnull List<String> keys);
Review comment:
can you help me understand the purpose of adding the secondary keys.
bcoz, I don't see similar method to mightContain for secondary keys.
Also, do you think we can name the method conveying secondary keys.
addSecondaryKeys() may be.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -659,20 +691,100 @@ private MetadataRecordsGenerationParams
getRecordsGenerationParams() {
/**
* Processes commit metadata from data table and commits to metadata table.
+ *
* @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the
respective metadata to List of HoodieRecords to be written to metadata table.
* @param <T> type of commit metadata.
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap
= convertMetadataFunction.convertMetadata();
- commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ try {
+ initializeFileGroups(dataMetaClient,
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to initialize
file groups for metadata partition: %s, instant: %s", p, instantTime));
+ }
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionRecordsMap = convertMetadataFunction.convertMetadata();
+ commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ }
+ });
+ }
+
+ private List<String> getMetadataPartitionsToUpdate() {
+ // find last (pending or) completed index instant and get partitions (to
be) written
+ Option<HoodieInstant> lastIndexingInstant =
dataMetaClient.getActiveTimeline()
+
.getTimelineOfActions(CollectionUtils.createImmutableSet(HoodieTimeline.INDEX_ACTION)).lastInstant();
+ if (lastIndexingInstant.isPresent()) {
+ try {
+ // TODO: handle inflight instant, if it is inflight then read from
requested file.
+ HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+
dataMetaClient.getActiveTimeline().readIndexPlanAsBytes(lastIndexingInstant.get()).get());
+ return
indexPlan.getIndexPartitionInfos().stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList());
+ } catch (IOException e) {
+ LOG.warn("Could not read index plan. Falling back to
FileSystem.exists() check.");
+ return getExistingMetadataPartitions();
+ }
}
+ // TODO: return only enabled partitions
+ return MetadataPartitionType.allPaths();
+ }
+
+ private List<String> getExistingMetadataPartitions() {
+ return MetadataPartitionType.allPaths().stream()
+ .filter(p -> {
+ try {
+ // TODO: avoid fs.exists() check
+ return
metadataMetaClient.getFs().exists(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(),
p));
+ } catch (IOException e) {
+ return false;
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void index(HoodieEngineContext engineContext,
List<HoodieIndexPartitionInfo> indexPartitionInfos) {
+ indexPartitionInfos.forEach(indexPartitionInfo -> {
+ String indexUptoInstantTime = indexPartitionInfo.getIndexUptoInstant();
+ String relativePartitionPath =
indexPartitionInfo.getMetadataPartitionPath();
+ LOG.info(String.format("Creating a new metadata index for partition '%s'
under path %s upto instant %s",
+ relativePartitionPath, metadataWriteConfig.getBasePath(),
indexUptoInstantTime));
+ try {
+ HoodieTableMetaClient.withPropertyBuilder()
+ .setTableType(HoodieTableType.MERGE_ON_READ)
+ .setTableName(tableName)
+ .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
+ .setPayloadClassName(HoodieMetadataPayload.class.getName())
+ .setBaseFileFormat(HoodieFileFormat.HFILE.toString())
+ .setRecordKeyFields(RECORD_KEY_FIELD_NAME)
+
.setPopulateMetaFields(dataWriteConfig.getMetadataConfig().populateMetaFields())
+
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
+ .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
Review comment:
shouldn't we initTable only for the first time when MDT is getting
instantiated for the first time.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -641,12 +663,22 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
}
}
+ public void dropIndex(List<MetadataPartitionType> indexesToDrop) throws
IOException {
+ // TODO: update table config and do it in a transaction
Review comment:
please file a tracking ticket if we don't have one.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
##########
@@ -19,17 +19,28 @@
package org.apache.hudi.metadata;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import java.io.IOException;
import java.io.Serializable;
+import java.util.List;
/**
* Interface that supports updating metadata for a given table, as actions
complete.
*/
public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable
{
+ void index(HoodieEngineContext engineContext, List<HoodieIndexPartitionInfo>
indexPartitionInfos);
Review comment:
java docs
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Schedules INDEX action.
+ * <li>
+ * 1. Fetch last completed instant on data timeline.
+ * 2. Write the index plan to the <instant>.index.requested.
+ * 3. Initialize filegroups for the enabled partition types within a
transaction.
+ * </li>
+ */
+public class ScheduleIndexActionExecutor<T extends HoodieRecordPayload, I, K,
O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexPlan>> {
+
+ private static final Logger LOG =
LogManager.getLogger(ScheduleIndexActionExecutor.class);
+ private static final Integer INDEX_PLAN_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_PLAN_VERSION =
INDEX_PLAN_VERSION_1;
+
+ private final List<MetadataPartitionType> partitionsToIndex;
+ private final TransactionManager txnManager;
+
+ public ScheduleIndexActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<T, I, K, O> table,
+ String instantTime,
+ List<MetadataPartitionType>
partitionsToIndex) {
+ super(context, config, table, instantTime);
+ this.partitionsToIndex = partitionsToIndex;
+ this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
+ }
+
+ @Override
+ public Option<HoodieIndexPlan> execute() {
+ // validate partitionsToIndex
+ if (!MetadataPartitionType.allPaths().containsAll(partitionsToIndex)) {
+ throw new HoodieIndexException("Not all partitions are valid: " +
partitionsToIndex);
+ }
+ // get last completed instant
+ Option<HoodieInstant> indexUptoInstant =
table.getActiveTimeline().filterCompletedInstants().lastInstant();
+ if (indexUptoInstant.isPresent()) {
+ final HoodieInstant indexInstant =
HoodieTimeline.getIndexRequestedInstant(instantTime);
+ // for each partitionToIndex add that time to the plan
+ List<HoodieIndexPartitionInfo> indexPartitionInfos =
partitionsToIndex.stream()
+ .map(p -> new HoodieIndexPartitionInfo(LATEST_INDEX_PLAN_VERSION,
p.getPartitionPath(), indexUptoInstant.get().getTimestamp()))
+ .collect(Collectors.toList());
+ HoodieIndexPlan indexPlan = new
HoodieIndexPlan(LATEST_INDEX_PLAN_VERSION, indexPartitionInfos);
+ try {
+ table.getActiveTimeline().saveToPendingIndexCommit(indexInstant,
TimelineMetadataUtils.serializeIndexPlan(indexPlan));
+ } catch (IOException e) {
+ LOG.error("Error while saving index requested file", e);
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ table.getMetaClient().reloadActiveTimeline();
+
+ // start initializing filegroups
+ // 1. get metadata writer
+ HoodieTableMetadataWriter metadataWriter =
table.getMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format("Could not
get metadata writer to run index action for instant: %s", instantTime)));
+ // 2. take a lock --> begin tx (data table)
+ try {
+ this.txnManager.beginTransaction(Option.of(indexInstant),
Option.empty());
+ // 3. initialize filegroups as per plan for the enabled partition types
+ for (MetadataPartitionType partitionType : partitionsToIndex) {
+ metadataWriter.initializeFileGroups(table.getMetaClient(),
partitionType, indexInstant.getTimestamp(), 1);
+ }
+ } catch (IOException e) {
+ LOG.error("Could not initialize file groups");
+ throw new HoodieIOException(e.getMessage(), e);
+ } finally {
+ this.txnManager.endTransaction(Option.of(indexInstant));
+ }
+ return Option.of(indexPlan);
+ }
+ return Option.empty();
Review comment:
if someone triggers this for an empty table, whats the expected
behavior? do we update tableConfig that index building is complete?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -659,20 +691,100 @@ private MetadataRecordsGenerationParams
getRecordsGenerationParams() {
/**
* Processes commit metadata from data table and commits to metadata table.
+ *
* @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the
respective metadata to List of HoodieRecords to be written to metadata table.
* @param <T> type of commit metadata.
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap
= convertMetadataFunction.convertMetadata();
- commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ try {
+ initializeFileGroups(dataMetaClient,
MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to initialize
file groups for metadata partition: %s, instant: %s", p, instantTime));
+ }
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionRecordsMap = convertMetadataFunction.convertMetadata();
+ commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ }
+ });
+ }
+
+ private List<String> getMetadataPartitionsToUpdate() {
+ // find last (pending or) completed index instant and get partitions (to
be) written
+ Option<HoodieInstant> lastIndexingInstant =
dataMetaClient.getActiveTimeline()
+
.getTimelineOfActions(CollectionUtils.createImmutableSet(HoodieTimeline.INDEX_ACTION)).lastInstant();
+ if (lastIndexingInstant.isPresent()) {
+ try {
+ // TODO: handle inflight instant, if it is inflight then read from
requested file.
+ HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+
dataMetaClient.getActiveTimeline().readIndexPlanAsBytes(lastIndexingInstant.get()).get());
+ return
indexPlan.getIndexPartitionInfos().stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList());
+ } catch (IOException e) {
+ LOG.warn("Could not read index plan. Falling back to
FileSystem.exists() check.");
+ return getExistingMetadataPartitions();
+ }
}
+ // TODO: return only enabled partitions
+ return MetadataPartitionType.allPaths();
+ }
+
+ private List<String> getExistingMetadataPartitions() {
+ return MetadataPartitionType.allPaths().stream()
+ .filter(p -> {
+ try {
+ // TODO: avoid fs.exists() check
+ return
metadataMetaClient.getFs().exists(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(),
p));
+ } catch (IOException e) {
+ return false;
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void index(HoodieEngineContext engineContext,
List<HoodieIndexPartitionInfo> indexPartitionInfos) {
+ indexPartitionInfos.forEach(indexPartitionInfo -> {
+ String indexUptoInstantTime = indexPartitionInfo.getIndexUptoInstant();
+ String relativePartitionPath =
indexPartitionInfo.getMetadataPartitionPath();
+ LOG.info(String.format("Creating a new metadata index for partition '%s'
under path %s upto instant %s",
+ relativePartitionPath, metadataWriteConfig.getBasePath(),
indexUptoInstantTime));
+ try {
+ HoodieTableMetaClient.withPropertyBuilder()
+ .setTableType(HoodieTableType.MERGE_ON_READ)
+ .setTableName(tableName)
+ .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
+ .setPayloadClassName(HoodieMetadataPayload.class.getName())
+ .setBaseFileFormat(HoodieFileFormat.HFILE.toString())
+ .setRecordKeyFields(RECORD_KEY_FIELD_NAME)
+
.setPopulateMetaFields(dataWriteConfig.getMetadataConfig().populateMetaFields())
+
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
+ .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
+ initTableMetadata();
+ // this part now moves to scheduling
+ initializeFileGroups(dataMetaClient,
MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT)),
indexUptoInstantTime, 1);
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to initialize
file groups for metadata partition: %s, indexUptoInstant: %s",
+ relativePartitionPath, indexUptoInstantTime));
+ }
+
+ // List all partitions in the basePath of the containing dataset
+ LOG.info("Initializing metadata table by using file listings in " +
dataWriteConfig.getBasePath());
+ engineContext.setJobStatus(this.getClass().getSimpleName(),
"MetadataIndex: initializing metadata table by listing files and partitions");
+ List<DirectoryInfo> dirInfoList = listAllPartitions(dataMetaClient);
+
+ // During bootstrap, the list of files to be committed can be huge. So
creating a HoodieCommitMetadata out of these
+ // large number of files and calling the existing
update(HoodieCommitMetadata) function does not scale well.
+ // Hence, we have a special commit just for the bootstrap scenario.
+ initialCommit(indexUptoInstantTime);
Review comment:
is this applicable only for the initialization of first partition in the
metadata table?
If not, for subsequent partitions, shouldn't the intialCommit take in a list
of metadata partitions to be initialized?
sorry. guess I am missing something here.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
Review comment:
Did we add any additional/explicit metrics for async metadata indexer?
time for base file initialization, time for catch up etc.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+ private static final Logger LOG =
LogManager.getLogger(RunIndexActionExecutor.class);
+ private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION =
INDEX_COMMIT_METADATA_VERSION_1;
+ private static final int MAX_CONCURRENT_INDEXING = 1;
+
+ public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig
config, HoodieTable<T, I, K, O> table, String instantTime) {
+ super(context, config, table, instantTime);
+ }
+
+ @Override
+ public Option<HoodieIndexCommitMetadata> execute() {
+ HoodieTimer indexTimer = new HoodieTimer();
+ indexTimer.startTimer();
+
+ HoodieInstant indexInstant = table.getActiveTimeline()
+ .filterPendingIndexTimeline()
+ .filter(instant -> instant.getTimestamp().equals(instantTime))
+ .lastInstant()
+ .orElseThrow(() -> new HoodieIndexException(String.format("No pending
index instant found: %s", instantTime)));
+
ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+ String.format("Index instant %s already inflight", instantTime));
+ try {
+ // read HoodieIndexPlan assuming indexInstant is requested
+ // TODO: handle inflight instant, if it is inflight then throw error.
+ HoodieIndexPlan indexPlan =
TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+ List<HoodieIndexPartitionInfo> indexPartitionInfos =
indexPlan.getIndexPartitionInfos();
+ if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+ throw new HoodieIndexException(String.format("No partitions to index
for instant: %s", instantTime));
+ }
+ // transition requested indexInstant to inflight
+
table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant,
Option.empty());
+ // start indexing for each partition
+ HoodieTableMetadataWriter metadataWriter =
table.getMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format("Could not
get metadata writer to run index action for instant: %s", instantTime)));
+ metadataWriter.index(context, indexPartitionInfos);
+ // get all completed instants since the plan completed
+ // assumption is that all metadata partitions had same instant upto
which they were scheduled to be indexed
+ String indexUptoInstant =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ Stream<HoodieInstant> remainingInstantsToIndex =
table.getActiveTimeline().getWriteTimeline().getReverseOrderedInstants()
+ .filter(instant -> instant.isCompleted() &&
HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(),
indexUptoInstant));
+ // reconcile with metadata table timeline
+ String metadataBasePath =
HoodieTableMetadata.getMetadataTableBasePath(table.getMetaClient().getBasePath());
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
Review comment:
this might need some thought. Lets think about all diff scenarios.
MDT partition1 was already built out.
MDT partition2 is triggered index building.
In this case, would compaction kick in just for partition1 in MDT or do we
block any compaction in general?
Also, how do we guard the archival in MDT timeline in this case.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.MetadataPartitionType;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+ private final HoodieIndexer.Config cfg;
+ private TypedProperties props;
+ private final JavaSparkContext jsc;
+ private final HoodieTableMetaClient metaClient;
+
+ public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+ this.cfg = cfg;
+ this.jsc = jsc;
+ this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+ ? UtilHelpers.buildProperties(cfg.configs)
+ : readConfigFromFileSystem(jsc, cfg);
+ this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+ }
+
+ private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
HoodieIndexer.Config cfg) {
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
Path(cfg.propsFilePath), cfg.configs)
+ .getProps(true);
+ }
+
+ public static class Config implements Serializable {
+ @Parameter(names = {"--base-path", "-sp"}, description = "Base path for
the table", required = true)
+ public String basePath = null;
+ @Parameter(names = {"--table-name", "-tn"}, description = "Table name",
required = true)
+ public String tableName = null;
+ @Parameter(names = {"--instant-time", "-it"}, description = "Indexing
Instant time")
+ public String indexInstantTime = null;
+ @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism
for hoodie insert", required = true)
+ public int parallelism = 1;
+ @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+ public String sparkMaster = null;
+ @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory
to use", required = true)
+ public String sparkMemory = null;
+ @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+ public int retry = 0;
+ @Parameter(names = {"--schedule", "-sc"}, description = "Schedule
indexing")
+ public Boolean runSchedule = false;
+ @Parameter(names = {"--strategy", "-st"}, description = "Comma-separated
index types to be built, e.g. BLOOM,FILES,COLSTATS")
+ public String indexTypes = null;
+ @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set
\"schedule\" to generate an indexing plan; "
+ + "Set \"execute\" to execute the indexing plan at the given instant,
which means --instant-time is required here; "
+ + "Set \"scheduleAndExecute\" to generate an indexing plan first and
execute that plan immediately")
+ public String runningMode = null;
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+ @Parameter(names = {"--props"}, description = "path to properties file on
localfs or dfs, with configurations for "
+ + "hoodie client for compacting")
+ public String propsFilePath = null;
+
+ @Parameter(names = {"--hoodie-conf"}, description = "Any configuration
that can be set in the properties file "
+ + "(using the CLI parameter \"--props\") can also be passed command
line using this parameter. This can be repeated",
+ splitter = IdentitySplitter.class)
+ public List<String> configs = new ArrayList<>();
+ }
+
+ public static void main(String[] args) {
+ final HoodieIndexer.Config cfg = new HoodieIndexer.Config();
+ JCommander cmd = new JCommander(cfg, null, args);
+
+ if (cfg.help || args.length == 0) {
+ cmd.usage();
+ System.exit(1);
+ }
+
+ final JavaSparkContext jsc = UtilHelpers.buildSparkContext("indexing-" +
cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
+ HoodieIndexer indexer = new HoodieIndexer(jsc, cfg);
+ int result = indexer.start(cfg.retry);
+ String resultMsg = String.format("Indexing with basePath: %s, tableName:
%s, runningMode: %s",
+ cfg.basePath, cfg.tableName, cfg.runningMode);
+ if (result == -1) {
+ LOG.error(resultMsg + " failed");
+ } else {
+ LOG.info(resultMsg + " success");
+ }
+ jsc.stop();
+ }
+
+ private int start(int retry) {
+ return UtilHelpers.retry(retry, () -> {
+ switch (cfg.runningMode.toLowerCase()) {
+ case SCHEDULE: {
+ LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
+ Option<String> instantTime = scheduleIndexing(jsc);
+ int result = instantTime.isPresent() ? 0 : -1;
+ if (result == 0) {
+ LOG.info("The schedule instant time is " + instantTime.get());
+ }
+ return result;
+ }
+ case SCHEDULE_AND_EXECUTE: {
+ LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
+ return scheduleAndRunIndexing(jsc);
+ }
+ case EXECUTE: {
+ LOG.info("Running Mode: [" + EXECUTE + "];");
+ return runIndexing(jsc);
+ }
+ default: {
+ LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit
the job directly");
+ return -1;
+ }
+ }
+ }, "Indexer failed");
+ }
+
+ private Option<String> scheduleIndexing(JavaSparkContext jsc) throws
Exception {
+ String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+ try (SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Option.empty(), props)) {
+ return doSchedule(client);
+ }
+ }
+
+ private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload>
client) {
+ List<String> partitionsToIndex = Arrays.asList(cfg.indexTypes.split(","));
+ List<MetadataPartitionType> partitionTypes = partitionsToIndex.stream()
+ .map(MetadataPartitionType::valueOf).collect(Collectors.toList());
+ Option<String> indexingInstant = client.scheduleIndexing(partitionTypes);
+ if (!indexingInstant.isPresent()) {
+ LOG.error("Scheduling of index action did not return any instant.");
+ }
+ return indexingInstant;
+ }
+
+ private int runIndexing(JavaSparkContext jsc) throws Exception {
+ String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+ try (SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Option.empty(), props)) {
+ if (StringUtils.isNullOrEmpty(cfg.indexInstantTime)) {
+ // Instant time is not specified
+ // Find the earliest scheduled indexing instant for execution
+ Option<HoodieInstant> earliestPendingIndexInstant =
metaClient.getActiveTimeline()
+ .filterPendingIndexTimeline()
+ .filter(i -> !(i.isCompleted() || INFLIGHT.equals(i.getState())))
+ .firstInstant();
+ if (earliestPendingIndexInstant.isPresent()) {
+ cfg.indexInstantTime =
earliestPendingIndexInstant.get().getTimestamp();
+ LOG.info("Found the earliest scheduled indexing instant which will
be executed: "
+ + cfg.indexInstantTime);
+ } else {
+ throw new HoodieIndexException("There is no scheduled indexing in
the table.");
+ }
+ }
+ return handleError(client.index(cfg.indexInstantTime));
+ }
+ }
+
+ private int scheduleAndRunIndexing(JavaSparkContext jsc) throws Exception {
+ String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+ try (SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Option.empty(), props)) {
+ Option<String> indexingInstantTime = doSchedule(client);
+ if (indexingInstantTime.isPresent()) {
+ return handleError(client.index(indexingInstantTime.get()));
Review comment:
handleResponse may be better name
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -608,7 +624,7 @@ private void
initializeEnabledFileGroups(HoodieTableMetaClient dataMetaClient, S
* File groups will be named as :
* record-index-bucket-0000, .... -> ..., record-index-bucket-0009
*/
- private void initializeFileGroups(HoodieTableMetaClient dataMetaClient,
MetadataPartitionType metadataPartition, String instantTime,
+ public void initializeFileGroups(HoodieTableMetaClient dataMetaClient,
MetadataPartitionType metadataPartition, String instantTime,
Review comment:
Can we check the bootstrapping code snippet. for eg, we check latest
synced instant in metadata table and check if its already archived in data
table.
With multiple partitions, each partition could be instantiated at different
points in time. Can we check all such guards/conditions and ensure its all
intact with latest state of metadata table.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.MetadataPartitionType;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+ private final HoodieIndexer.Config cfg;
+ private TypedProperties props;
+ private final JavaSparkContext jsc;
+ private final HoodieTableMetaClient metaClient;
+
+ public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+ this.cfg = cfg;
+ this.jsc = jsc;
+ this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+ ? UtilHelpers.buildProperties(cfg.configs)
+ : readConfigFromFileSystem(jsc, cfg);
+ this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+ }
+
+ private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
HoodieIndexer.Config cfg) {
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
Path(cfg.propsFilePath), cfg.configs)
+ .getProps(true);
+ }
+
+ public static class Config implements Serializable {
+ @Parameter(names = {"--base-path", "-sp"}, description = "Base path for
the table", required = true)
+ public String basePath = null;
+ @Parameter(names = {"--table-name", "-tn"}, description = "Table name",
required = true)
+ public String tableName = null;
+ @Parameter(names = {"--instant-time", "-it"}, description = "Indexing
Instant time")
+ public String indexInstantTime = null;
+ @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism
for hoodie insert", required = true)
+ public int parallelism = 1;
+ @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+ public String sparkMaster = null;
+ @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory
to use", required = true)
+ public String sparkMemory = null;
+ @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+ public int retry = 0;
+ @Parameter(names = {"--schedule", "-sc"}, description = "Schedule
indexing")
+ public Boolean runSchedule = false;
+ @Parameter(names = {"--strategy", "-st"}, description = "Comma-separated
index types to be built, e.g. BLOOM,FILES,COLSTATS")
+ public String indexTypes = null;
+ @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set
\"schedule\" to generate an indexing plan; "
Review comment:
how is this diff from runSchedule param. its bit confusing.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -889,6 +890,33 @@ public static HoodieTableFileSystemView
getFileSystemView(HoodieTableMetaClient
}
}
+ /**
+ * Get the column names for the table for column stats indexing
+ *
+ * @param recordsGenerationParams - all parameters required to generate
metadata index for enabled index types
+ * @return List of column names for which column stats index is enabled
+ */
+ private static List<String>
getColumnsToIndex(MetadataRecordsGenerationParams recordsGenerationParams) {
+ if (!recordsGenerationParams.isAllColumnStatsIndexEnabled()
+ ||
recordsGenerationParams.getDataMetaClient().getCommitsTimeline().filterCompletedInstants().countInstants()
< 1) {
+ return
Arrays.asList(recordsGenerationParams.getDataMetaClient().getTableConfig().getRecordKeyFieldProp().split(","));
+ }
+
+ if (!recordsGenerationParams.getColumnsToIndex().isEmpty()) {
+ return recordsGenerationParams.getColumnsToIndex();
+ }
+
+ TableSchemaResolver schemaResolver = new
TableSchemaResolver(recordsGenerationParams.getDataMetaClient());
+ // consider nested fields as well. if column stats is enabled only for a
subset of columns,
Review comment:
guess part of the comment can be removed.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.MetadataPartitionType;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+ private final HoodieIndexer.Config cfg;
+ private TypedProperties props;
+ private final JavaSparkContext jsc;
+ private final HoodieTableMetaClient metaClient;
+
+ public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+ this.cfg = cfg;
+ this.jsc = jsc;
+ this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+ ? UtilHelpers.buildProperties(cfg.configs)
+ : readConfigFromFileSystem(jsc, cfg);
+ this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+ }
+
+ private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
HoodieIndexer.Config cfg) {
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
Path(cfg.propsFilePath), cfg.configs)
+ .getProps(true);
+ }
+
+ public static class Config implements Serializable {
+ @Parameter(names = {"--base-path", "-sp"}, description = "Base path for
the table", required = true)
+ public String basePath = null;
+ @Parameter(names = {"--table-name", "-tn"}, description = "Table name",
required = true)
+ public String tableName = null;
+ @Parameter(names = {"--instant-time", "-it"}, description = "Indexing
Instant time")
+ public String indexInstantTime = null;
+ @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism
for hoodie insert", required = true)
+ public int parallelism = 1;
+ @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+ public String sparkMaster = null;
+ @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory
to use", required = true)
+ public String sparkMemory = null;
+ @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+ public int retry = 0;
+ @Parameter(names = {"--schedule", "-sc"}, description = "Schedule
indexing")
+ public Boolean runSchedule = false;
+ @Parameter(names = {"--strategy", "-st"}, description = "Comma-separated
index types to be built, e.g. BLOOM,FILES,COLSTATS")
+ public String indexTypes = null;
+ @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set
\"schedule\" to generate an indexing plan; "
+ + "Set \"execute\" to execute the indexing plan at the given instant,
which means --instant-time is required here; "
+ + "Set \"scheduleAndExecute\" to generate an indexing plan first and
execute that plan immediately")
+ public String runningMode = null;
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+ @Parameter(names = {"--props"}, description = "path to properties file on
localfs or dfs, with configurations for "
+ + "hoodie client for compacting")
Review comment:
minor. "compacting" -> "indexing"
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+ private static final Logger LOG =
LogManager.getLogger(RunIndexActionExecutor.class);
+ private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION =
INDEX_COMMIT_METADATA_VERSION_1;
+ private static final int MAX_CONCURRENT_INDEXING = 1;
+
+ public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig
config, HoodieTable<T, I, K, O> table, String instantTime) {
+ super(context, config, table, instantTime);
+ }
+
+ @Override
+ public Option<HoodieIndexCommitMetadata> execute() {
+ HoodieTimer indexTimer = new HoodieTimer();
+ indexTimer.startTimer();
+
+ HoodieInstant indexInstant = table.getActiveTimeline()
+ .filterPendingIndexTimeline()
+ .filter(instant -> instant.getTimestamp().equals(instantTime))
+ .lastInstant()
+ .orElseThrow(() -> new HoodieIndexException(String.format("No pending
index instant found: %s", instantTime)));
+
ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+ String.format("Index instant %s already inflight", instantTime));
+ try {
+ // read HoodieIndexPlan assuming indexInstant is requested
+ // TODO: handle inflight instant, if it is inflight then throw error.
+ HoodieIndexPlan indexPlan =
TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+ List<HoodieIndexPartitionInfo> indexPartitionInfos =
indexPlan.getIndexPartitionInfos();
+ if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+ throw new HoodieIndexException(String.format("No partitions to index
for instant: %s", instantTime));
+ }
+ // transition requested indexInstant to inflight
+
table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant,
Option.empty());
+ // start indexing for each partition
+ HoodieTableMetadataWriter metadataWriter =
table.getMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format("Could not
get metadata writer to run index action for instant: %s", instantTime)));
+ metadataWriter.index(context, indexPartitionInfos);
+ // get all completed instants since the plan completed
+ // assumption is that all metadata partitions had same instant upto
which they were scheduled to be indexed
+ String indexUptoInstant =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ Stream<HoodieInstant> remainingInstantsToIndex =
table.getActiveTimeline().getWriteTimeline().getReverseOrderedInstants()
+ .filter(instant -> instant.isCompleted() &&
HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(),
indexUptoInstant));
+ // reconcile with metadata table timeline
+ String metadataBasePath =
HoodieTableMetadata.getMetadataTableBasePath(table.getMetaClient().getBasePath());
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
+ Set<HoodieInstant> metadataCompletedTimeline =
metadataMetaClient.getActiveTimeline()
+
.getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toSet());
+ List<HoodieInstant> finalRemainingInstantsToIndex =
remainingInstantsToIndex.map(
+ instant -> new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, instant.getTimestamp())
+ ).filter(instant ->
!metadataCompletedTimeline.contains(instant)).collect(Collectors.toList());
+
+ // index all remaining instants with a timeout
+ ExecutorService executorService =
Executors.newFixedThreadPool(MAX_CONCURRENT_INDEXING);
+ Future<?> postRequestIndexingTaskFuture = executorService.submit(new
PostRequestIndexingTask(metadataWriter, finalRemainingInstantsToIndex));
+ try {
+ // TODO: configure timeout
+ postRequestIndexingTaskFuture.get(60, TimeUnit.SECONDS);
Review comment:
60 secs is too short. if there are 100+ instants to catch up, would we
complete in 60 secs.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -641,12 +663,22 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
}
}
+ public void dropIndex(List<MetadataPartitionType> indexesToDrop) throws
IOException {
+ // TODO: update table config and do it in a transaction
Review comment:
If a writer is holding onto an instance of hoodieTableConfig, it may not
refresh from time to time right. So, if a partition was deleted mid-way, when
the writer tries to apply a commit to metadata table, wont
hoodieTableConfig.getMetadataPartitionsToUpdate() return stale values?
Do we ensure such flow succeeds even if there are partitions to update, but
actual MD partition is deleted?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Schedules INDEX action.
+ * <li>
+ * 1. Fetch last completed instant on data timeline.
+ * 2. Write the index plan to the <instant>.index.requested.
+ * 3. Initialize filegroups for the enabled partition types within a
transaction.
+ * </li>
+ */
+public class ScheduleIndexActionExecutor<T extends HoodieRecordPayload, I, K,
O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexPlan>> {
+
+ private static final Logger LOG =
LogManager.getLogger(ScheduleIndexActionExecutor.class);
+ private static final Integer INDEX_PLAN_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_PLAN_VERSION =
INDEX_PLAN_VERSION_1;
+
+ private final List<MetadataPartitionType> partitionsToIndex;
+ private final TransactionManager txnManager;
+
+ public ScheduleIndexActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<T, I, K, O> table,
+ String instantTime,
+ List<MetadataPartitionType>
partitionsToIndex) {
+ super(context, config, table, instantTime);
+ this.partitionsToIndex = partitionsToIndex;
+ this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
+ }
+
+ @Override
+ public Option<HoodieIndexPlan> execute() {
+ // validate partitionsToIndex
+ if (!MetadataPartitionType.allPaths().containsAll(partitionsToIndex)) {
+ throw new HoodieIndexException("Not all partitions are valid: " +
partitionsToIndex);
+ }
+ // get last completed instant
+ Option<HoodieInstant> indexUptoInstant =
table.getActiveTimeline().filterCompletedInstants().lastInstant();
+ if (indexUptoInstant.isPresent()) {
+ final HoodieInstant indexInstant =
HoodieTimeline.getIndexRequestedInstant(instantTime);
+ // for each partitionToIndex add that time to the plan
+ List<HoodieIndexPartitionInfo> indexPartitionInfos =
partitionsToIndex.stream()
+ .map(p -> new HoodieIndexPartitionInfo(LATEST_INDEX_PLAN_VERSION,
p.getPartitionPath(), indexUptoInstant.get().getTimestamp()))
+ .collect(Collectors.toList());
+ HoodieIndexPlan indexPlan = new
HoodieIndexPlan(LATEST_INDEX_PLAN_VERSION, indexPartitionInfos);
+ try {
+ table.getActiveTimeline().saveToPendingIndexCommit(indexInstant,
TimelineMetadataUtils.serializeIndexPlan(indexPlan));
+ } catch (IOException e) {
+ LOG.error("Error while saving index requested file", e);
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ table.getMetaClient().reloadActiveTimeline();
+
+ // start initializing filegroups
+ // 1. get metadata writer
+ HoodieTableMetadataWriter metadataWriter =
table.getMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format("Could not
get metadata writer to run index action for instant: %s", instantTime)));
+ // 2. take a lock --> begin tx (data table)
+ try {
+ this.txnManager.beginTransaction(Option.of(indexInstant),
Option.empty());
+ // 3. initialize filegroups as per plan for the enabled partition types
+ for (MetadataPartitionType partitionType : partitionsToIndex) {
+ metadataWriter.initializeFileGroups(table.getMetaClient(),
partitionType, indexInstant.getTimestamp(), 1);
Review comment:
guess last arg is partitionType.getFileGroupCount()
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+ private final HoodieIndexer.Config cfg;
+ private TypedProperties props;
+ private final JavaSparkContext jsc;
+ private final HoodieTableMetaClient metaClient;
+
+ public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+ this.cfg = cfg;
+ this.jsc = jsc;
+ this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+ ? UtilHelpers.buildProperties(cfg.configs)
+ : readConfigFromFileSystem(jsc, cfg);
+ this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+ }
+
+ private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
HoodieIndexer.Config cfg) {
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
Path(cfg.propsFilePath), cfg.configs)
+ .getProps(true);
+ }
+
+ public static class Config implements Serializable {
+ @Parameter(names = {"--base-path", "-sp"}, description = "Base path for
the table", required = true)
+ public String basePath = null;
+ @Parameter(names = {"--table-name", "-tn"}, description = "Table name",
required = true)
+ public String tableName = null;
+ @Parameter(names = {"--instant-time", "-it"}, description = "Indexing
Instant time")
+ public String indexInstantTime = null;
+ @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism
for hoodie insert", required = true)
+ public int parallelism = 1;
+ @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+ public String sparkMaster = null;
+ @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory
to use", required = true)
+ public String sparkMemory = null;
+ @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+ public int retry = 0;
+ @Parameter(names = {"--schedule", "-sc"}, description = "Schedule
indexing")
+ public Boolean runSchedule = false;
+ @Parameter(names = {"--strategy", "-st"}, description = "Comma-separated
index types to be built, e.g. BLOOM,FILES,COLSTATS")
Review comment:
+1
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+ private static final Logger LOG =
LogManager.getLogger(RunIndexActionExecutor.class);
+ private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION =
INDEX_COMMIT_METADATA_VERSION_1;
+ private static final int MAX_CONCURRENT_INDEXING = 1;
+
+ public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig
config, HoodieTable<T, I, K, O> table, String instantTime) {
+ super(context, config, table, instantTime);
+ }
+
+ @Override
+ public Option<HoodieIndexCommitMetadata> execute() {
+ HoodieTimer indexTimer = new HoodieTimer();
+ indexTimer.startTimer();
+
+ HoodieInstant indexInstant = table.getActiveTimeline()
+ .filterPendingIndexTimeline()
+ .filter(instant -> instant.getTimestamp().equals(instantTime))
+ .lastInstant()
+ .orElseThrow(() -> new HoodieIndexException(String.format("No pending
index instant found: %s", instantTime)));
+
ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+ String.format("Index instant %s already inflight", instantTime));
+ try {
+ // read HoodieIndexPlan assuming indexInstant is requested
+ // TODO: handle inflight instant, if it is inflight then throw error.
+ HoodieIndexPlan indexPlan =
TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+ List<HoodieIndexPartitionInfo> indexPartitionInfos =
indexPlan.getIndexPartitionInfos();
+ if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+ throw new HoodieIndexException(String.format("No partitions to index
for instant: %s", instantTime));
+ }
+ // transition requested indexInstant to inflight
+
table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant,
Option.empty());
+ // start indexing for each partition
+ HoodieTableMetadataWriter metadataWriter =
table.getMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format("Could not
get metadata writer to run index action for instant: %s", instantTime)));
+ metadataWriter.index(context, indexPartitionInfos);
+ // get all completed instants since the plan completed
+ // assumption is that all metadata partitions had same instant upto
which they were scheduled to be indexed
+ String indexUptoInstant =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ Stream<HoodieInstant> remainingInstantsToIndex =
table.getActiveTimeline().getWriteTimeline().getReverseOrderedInstants()
Review comment:
there could be some instants in data table timeline that got archived.
did we consider those scenarios.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+ private static final Logger LOG =
LogManager.getLogger(RunIndexActionExecutor.class);
+ private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION =
INDEX_COMMIT_METADATA_VERSION_1;
+ private static final int MAX_CONCURRENT_INDEXING = 1;
+
+ public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig
config, HoodieTable<T, I, K, O> table, String instantTime) {
+ super(context, config, table, instantTime);
+ }
+
+ @Override
+ public Option<HoodieIndexCommitMetadata> execute() {
+ HoodieTimer indexTimer = new HoodieTimer();
+ indexTimer.startTimer();
+
+ HoodieInstant indexInstant = table.getActiveTimeline()
+ .filterPendingIndexTimeline()
+ .filter(instant -> instant.getTimestamp().equals(instantTime))
+ .lastInstant()
+ .orElseThrow(() -> new HoodieIndexException(String.format("No pending
index instant found: %s", instantTime)));
+
ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+ String.format("Index instant %s already inflight", instantTime));
+ try {
+ // read HoodieIndexPlan assuming indexInstant is requested
+ // TODO: handle inflight instant, if it is inflight then throw error.
+ HoodieIndexPlan indexPlan =
TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+ List<HoodieIndexPartitionInfo> indexPartitionInfos =
indexPlan.getIndexPartitionInfos();
+ if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+ throw new HoodieIndexException(String.format("No partitions to index
for instant: %s", instantTime));
+ }
+ // transition requested indexInstant to inflight
+
table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant,
Option.empty());
+ // start indexing for each partition
+ HoodieTableMetadataWriter metadataWriter =
table.getMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format("Could not
get metadata writer to run index action for instant: %s", instantTime)));
+ metadataWriter.index(context, indexPartitionInfos);
+ // get all completed instants since the plan completed
+ // assumption is that all metadata partitions had same instant upto
which they were scheduled to be indexed
+ String indexUptoInstant =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ Stream<HoodieInstant> remainingInstantsToIndex =
table.getActiveTimeline().getWriteTimeline().getReverseOrderedInstants()
+ .filter(instant -> instant.isCompleted() &&
HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(),
indexUptoInstant));
+ // reconcile with metadata table timeline
+ String metadataBasePath =
HoodieTableMetadata.getMetadataTableBasePath(table.getMetaClient().getBasePath());
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
+ Set<HoodieInstant> metadataCompletedTimeline =
metadataMetaClient.getActiveTimeline()
+
.getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toSet());
+ List<HoodieInstant> finalRemainingInstantsToIndex =
remainingInstantsToIndex.map(
Review comment:
I see we fetch all instants (pending, complete) at L106. so, I assume
finalRemainingInstantsToIndex could contain inflight commits as well. And so,
there are chances that when executing PostRequestIndexingTask, the actual
writer would have already applied the commit to MDT. have we considered this
scenario.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.MetadataPartitionType;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+ private final HoodieIndexer.Config cfg;
+ private TypedProperties props;
+ private final JavaSparkContext jsc;
+ private final HoodieTableMetaClient metaClient;
+
+ public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+ this.cfg = cfg;
+ this.jsc = jsc;
+ this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+ ? UtilHelpers.buildProperties(cfg.configs)
+ : readConfigFromFileSystem(jsc, cfg);
+ this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+ }
+
+ private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
HoodieIndexer.Config cfg) {
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
Path(cfg.propsFilePath), cfg.configs)
+ .getProps(true);
+ }
+
+ public static class Config implements Serializable {
+ @Parameter(names = {"--base-path", "-sp"}, description = "Base path for
the table", required = true)
+ public String basePath = null;
+ @Parameter(names = {"--table-name", "-tn"}, description = "Table name",
required = true)
+ public String tableName = null;
+ @Parameter(names = {"--instant-time", "-it"}, description = "Indexing
Instant time")
+ public String indexInstantTime = null;
+ @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism
for hoodie insert", required = true)
+ public int parallelism = 1;
+ @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+ public String sparkMaster = null;
+ @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory
to use", required = true)
+ public String sparkMemory = null;
+ @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+ public int retry = 0;
+ @Parameter(names = {"--schedule", "-sc"}, description = "Schedule
indexing")
+ public Boolean runSchedule = false;
+ @Parameter(names = {"--strategy", "-st"}, description = "Comma-separated
index types to be built, e.g. BLOOM,FILES,COLSTATS")
+ public String indexTypes = null;
+ @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set
\"schedule\" to generate an indexing plan; "
+ + "Set \"execute\" to execute the indexing plan at the given instant,
which means --instant-time is required here; "
+ + "Set \"scheduleAndExecute\" to generate an indexing plan first and
execute that plan immediately")
Review comment:
is there a necessity to add cancelIndexing operation ?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -121,6 +121,15 @@ protected void initRegistry() {
}
}
+ @Override
+ protected void scheduleIndex(List<String> partitions) {
+ ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is
not fully initialized yet.");
Review comment:
can you confirm this. for "files", we always do synchronous
initialization is it?
what happens, if during synchronous initialization of metadata table,
someone schedules "col_stats" partition indexing via the tool. Do we guard the
writes/critical section w/ a lock?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+ private static final Logger LOG =
LogManager.getLogger(RunIndexActionExecutor.class);
+ private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION =
INDEX_COMMIT_METADATA_VERSION_1;
+ private static final int MAX_CONCURRENT_INDEXING = 1;
+
+ public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig
config, HoodieTable<T, I, K, O> table, String instantTime) {
+ super(context, config, table, instantTime);
+ }
+
+ @Override
+ public Option<HoodieIndexCommitMetadata> execute() {
+ HoodieTimer indexTimer = new HoodieTimer();
+ indexTimer.startTimer();
+
+ HoodieInstant indexInstant = table.getActiveTimeline()
+ .filterPendingIndexTimeline()
+ .filter(instant -> instant.getTimestamp().equals(instantTime))
+ .lastInstant()
+ .orElseThrow(() -> new HoodieIndexException(String.format("No pending
index instant found: %s", instantTime)));
+
ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+ String.format("Index instant %s already inflight", instantTime));
+ try {
+ // read HoodieIndexPlan assuming indexInstant is requested
+ // TODO: handle inflight instant, if it is inflight then throw error.
+ HoodieIndexPlan indexPlan =
TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+ List<HoodieIndexPartitionInfo> indexPartitionInfos =
indexPlan.getIndexPartitionInfos();
+ if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+ throw new HoodieIndexException(String.format("No partitions to index
for instant: %s", instantTime));
+ }
+ // transition requested indexInstant to inflight
+
table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant,
Option.empty());
+ // start indexing for each partition
+ HoodieTableMetadataWriter metadataWriter =
table.getMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format("Could not
get metadata writer to run index action for instant: %s", instantTime)));
+ metadataWriter.index(context, indexPartitionInfos);
+ // get all completed instants since the plan completed
+ // assumption is that all metadata partitions had same instant upto
which they were scheduled to be indexed
+ String indexUptoInstant =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ Stream<HoodieInstant> remainingInstantsToIndex =
table.getActiveTimeline().getWriteTimeline().getReverseOrderedInstants()
+ .filter(instant -> instant.isCompleted() &&
HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(),
indexUptoInstant));
+ // reconcile with metadata table timeline
+ String metadataBasePath =
HoodieTableMetadata.getMetadataTableBasePath(table.getMetaClient().getBasePath());
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
+ Set<HoodieInstant> metadataCompletedTimeline =
metadataMetaClient.getActiveTimeline()
+
.getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toSet());
+ List<HoodieInstant> finalRemainingInstantsToIndex =
remainingInstantsToIndex.map(
+ instant -> new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, instant.getTimestamp())
+ ).filter(instant ->
!metadataCompletedTimeline.contains(instant)).collect(Collectors.toList());
+
+ // index all remaining instants with a timeout
+ ExecutorService executorService =
Executors.newFixedThreadPool(MAX_CONCURRENT_INDEXING);
+ Future<?> postRequestIndexingTaskFuture = executorService.submit(new
PostRequestIndexingTask(metadataWriter, finalRemainingInstantsToIndex));
+ try {
+ // TODO: configure timeout
+ postRequestIndexingTaskFuture.get(60, TimeUnit.SECONDS);
+ } catch (TimeoutException | InterruptedException | ExecutionException e)
{
+ postRequestIndexingTaskFuture.cancel(true);
+ } finally {
+ executorService.shutdownNow();
+ }
+ Option<HoodieInstant> lastMetadataInstant =
metadataMetaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
+ if (lastMetadataInstant.isPresent() &&
indexUptoInstant.equals(lastMetadataInstant.get().getTimestamp())) {
+ return Option.of(HoodieIndexCommitMetadata.newBuilder()
+
.setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(indexPartitionInfos).build());
+ }
+ List<HoodieIndexPartitionInfo> finalIndexPartitionInfos =
indexPartitionInfos.stream()
+ .map(info -> new HoodieIndexPartitionInfo(
+ info.getVersion(),
+ info.getMetadataPartitionPath(),
+
lastMetadataInstant.get().getTimestamp())).collect(Collectors.toList());
+ return Option.of(HoodieIndexCommitMetadata.newBuilder()
+
.setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(finalIndexPartitionInfos).build());
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to index instant:
%s", indexInstant));
+ }
+ }
+
+ class PostRequestIndexingTask implements Runnable {
+
+ private final HoodieTableMetadataWriter metadataWriter;
+ private final List<HoodieInstant> instantsToIndex;
+
+ PostRequestIndexingTask(HoodieTableMetadataWriter metadataWriter,
List<HoodieInstant> instantsToIndex) {
+ this.metadataWriter = metadataWriter;
+ this.instantsToIndex = instantsToIndex;
+ }
+
+ @Override
+ public void run() {
+ while (!Thread.interrupted()) {
+ for (HoodieInstant instant : instantsToIndex) {
Review comment:
don't we need to take a lock here?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+ private static final Logger LOG =
LogManager.getLogger(RunIndexActionExecutor.class);
+ private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION =
INDEX_COMMIT_METADATA_VERSION_1;
+ private static final int MAX_CONCURRENT_INDEXING = 1;
+
+ public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig
config, HoodieTable<T, I, K, O> table, String instantTime) {
+ super(context, config, table, instantTime);
+ }
+
+ @Override
+ public Option<HoodieIndexCommitMetadata> execute() {
+ HoodieTimer indexTimer = new HoodieTimer();
+ indexTimer.startTimer();
+
+ HoodieInstant indexInstant = table.getActiveTimeline()
+ .filterPendingIndexTimeline()
+ .filter(instant -> instant.getTimestamp().equals(instantTime))
+ .lastInstant()
+ .orElseThrow(() -> new HoodieIndexException(String.format("No pending
index instant found: %s", instantTime)));
+
ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+ String.format("Index instant %s already inflight", instantTime));
+ try {
+ // read HoodieIndexPlan assuming indexInstant is requested
+ // TODO: handle inflight instant, if it is inflight then throw error.
+ HoodieIndexPlan indexPlan =
TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+ List<HoodieIndexPartitionInfo> indexPartitionInfos =
indexPlan.getIndexPartitionInfos();
+ if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+ throw new HoodieIndexException(String.format("No partitions to index
for instant: %s", instantTime));
+ }
+ // transition requested indexInstant to inflight
+
table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant,
Option.empty());
+ // start indexing for each partition
+ HoodieTableMetadataWriter metadataWriter =
table.getMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format("Could not
get metadata writer to run index action for instant: %s", instantTime)));
+ metadataWriter.index(context, indexPartitionInfos);
+ // get all completed instants since the plan completed
+ // assumption is that all metadata partitions had same instant upto
which they were scheduled to be indexed
+ String indexUptoInstant =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ Stream<HoodieInstant> remainingInstantsToIndex =
table.getActiveTimeline().getWriteTimeline().getReverseOrderedInstants()
+ .filter(instant -> instant.isCompleted() &&
HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(),
indexUptoInstant));
+ // reconcile with metadata table timeline
+ String metadataBasePath =
HoodieTableMetadata.getMetadataTableBasePath(table.getMetaClient().getBasePath());
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
+ Set<HoodieInstant> metadataCompletedTimeline =
metadataMetaClient.getActiveTimeline()
+
.getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toSet());
+ List<HoodieInstant> finalRemainingInstantsToIndex =
remainingInstantsToIndex.map(
+ instant -> new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, instant.getTimestamp())
+ ).filter(instant ->
!metadataCompletedTimeline.contains(instant)).collect(Collectors.toList());
+
+ // index all remaining instants with a timeout
+ ExecutorService executorService =
Executors.newFixedThreadPool(MAX_CONCURRENT_INDEXING);
+ Future<?> postRequestIndexingTaskFuture = executorService.submit(new
PostRequestIndexingTask(metadataWriter, finalRemainingInstantsToIndex));
+ try {
+ // TODO: configure timeout
+ postRequestIndexingTaskFuture.get(60, TimeUnit.SECONDS);
+ } catch (TimeoutException | InterruptedException | ExecutionException e)
{
+ postRequestIndexingTaskFuture.cancel(true);
+ } finally {
+ executorService.shutdownNow();
+ }
+ Option<HoodieInstant> lastMetadataInstant =
metadataMetaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
+ if (lastMetadataInstant.isPresent() &&
indexUptoInstant.equals(lastMetadataInstant.get().getTimestamp())) {
+ return Option.of(HoodieIndexCommitMetadata.newBuilder()
+
.setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(indexPartitionInfos).build());
+ }
+ List<HoodieIndexPartitionInfo> finalIndexPartitionInfos =
indexPartitionInfos.stream()
+ .map(info -> new HoodieIndexPartitionInfo(
+ info.getVersion(),
+ info.getMetadataPartitionPath(),
+
lastMetadataInstant.get().getTimestamp())).collect(Collectors.toList());
+ return Option.of(HoodieIndexCommitMetadata.newBuilder()
+
.setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(finalIndexPartitionInfos).build());
+ } catch (IOException e) {
Review comment:
sorry, where are we checking the holes and aborting the index building ?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
##########
@@ -343,6 +347,16 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext
context, String rollb
deleteInstants, skipLocking).execute();
}
+ @Override
+ public Option<HoodieIndexPlan> scheduleIndex(HoodieEngineContext context,
String indexInstantTime, List<String> partitionsToIndex) {
+ return new ScheduleIndexActionExecutor<>(context, config, this,
indexInstantTime, partitionsToIndex).execute();
Review comment:
this is just 1 line. don't think its a must to move to base class. will
leave it to you though. Already we have similar code across all engines.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+ private static final Logger LOG =
LogManager.getLogger(RunIndexActionExecutor.class);
+ private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION =
INDEX_COMMIT_METADATA_VERSION_1;
+ private static final int MAX_CONCURRENT_INDEXING = 1;
+
+ public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig
config, HoodieTable<T, I, K, O> table, String instantTime) {
+ super(context, config, table, instantTime);
+ }
+
+ @Override
+ public Option<HoodieIndexCommitMetadata> execute() {
+ HoodieTimer indexTimer = new HoodieTimer();
+ indexTimer.startTimer();
+
+ HoodieInstant indexInstant = table.getActiveTimeline()
+ .filterPendingIndexTimeline()
+ .filter(instant -> instant.getTimestamp().equals(instantTime))
+ .lastInstant()
+ .orElseThrow(() -> new HoodieIndexException(String.format("No pending
index instant found: %s", instantTime)));
+
ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+ String.format("Index instant %s already inflight", instantTime));
+ try {
+ // read HoodieIndexPlan assuming indexInstant is requested
+ // TODO: handle inflight instant, if it is inflight then throw error.
+ HoodieIndexPlan indexPlan =
TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+ List<HoodieIndexPartitionInfo> indexPartitionInfos =
indexPlan.getIndexPartitionInfos();
+ if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+ throw new HoodieIndexException(String.format("No partitions to index
for instant: %s", instantTime));
+ }
+ // transition requested indexInstant to inflight
+
table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant,
Option.empty());
+ // start indexing for each partition
+ HoodieTableMetadataWriter metadataWriter =
table.getMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format("Could not
get metadata writer to run index action for instant: %s", instantTime)));
+ metadataWriter.index(context, indexPartitionInfos);
Review comment:
don't we need locking here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]