nsivabalan commented on a change in pull request #4693:
URL: https://github.com/apache/hudi/pull/4693#discussion_r835722655
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -915,6 +917,39 @@ public boolean scheduleCompactionAtInstant(String
instantTime, Option<Map<String
return scheduleTableService(instantTime, extraMetadata,
TableServiceType.COMPACT).isPresent();
}
+ public Option<String> scheduleIndexing(List<MetadataPartitionType>
partitionTypes) {
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ return scheduleIndexingAtInstant(partitionTypes, instantTime) ?
Option.of(instantTime) : Option.empty();
+ }
+
+ private boolean scheduleIndexingAtInstant(List<MetadataPartitionType>
partitionTypes, String instantTime) throws HoodieIOException {
+ Option<HoodieIndexPlan> indexPlan = createTable(config, hadoopConf,
config.isMetadataTableEnabled())
+ .scheduleIndex(context, instantTime, partitionTypes);
+ return indexPlan.isPresent();
+ }
+
+ public Option<HoodieIndexCommitMetadata> index(String indexInstantTime) {
Review comment:
java docs for all public apis
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -663,20 +711,82 @@ private MetadataRecordsGenerationParams
getRecordsGenerationParams() {
/**
* Processes commit metadata from data table and commits to metadata table.
+ *
* @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the
respective metadata to List of HoodieRecords to be written to metadata table.
* @param <T> type of commit metadata.
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap
= convertMetadataFunction.convertMetadata();
- commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ if (!dataWriteConfig.isMetadataTableEnabled()) {
+ return;
+ }
+ Set<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionRecordsMap = convertMetadataFunction.convertMetadata();
+ commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ }
+ });
+ }
+
+ private Set<String> getMetadataPartitionsToUpdate() {
+ // fetch partitions to update from table config
+ Set<String> partitionsToUpdate =
Stream.of(dataMetaClient.getTableConfig().getCompletedMetadataIndexes().split(","))
Review comment:
can we expose a single api in tableConfig for
getInflightAndCompleteMetadataIndexes
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -175,6 +182,25 @@
.sinceVersion("0.11.0")
.withDocumentation("Parallelism to use, when generating column stats
index.");
+ public static final ConfigProperty<String> COLUMN_STATS_INDEX_FOR_COLUMNS =
ConfigProperty
+ .key(METADATA_PREFIX + ".index.column.stats.for.columns")
+ .defaultValue("")
+ .sinceVersion("0.11.0")
+ .withDocumentation("Comma-separated list of columns for which column
stats index will be built.");
+
+ public static final ConfigProperty<String> BLOOM_FILTER_INDEX_FOR_COLUMNS =
ConfigProperty
+ .key(METADATA_PREFIX + ".index.bloom.filter.for.columns")
+ .defaultValue("")
+ .sinceVersion("0.11.0")
+ .withDocumentation("Comma-separated list of columns for which bloom
filter index will be built.");
Review comment:
can we enhance docs to say whats the default behavior
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -645,12 +669,36 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
}
}
+ public void dropIndex(List<MetadataPartitionType> indexesToDrop) throws
IOException {
+ Set<String> completedIndexes =
Stream.of(dataMetaClient.getTableConfig().getCompletedMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet());
+ Set<String> inflightIndexes =
Stream.of(dataMetaClient.getTableConfig().getInflightMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet());
+ for (MetadataPartitionType partitionType : indexesToDrop) {
+ String partitionPath = partitionType.getPartitionPath();
+ if (inflightIndexes.contains(partitionPath)) {
+ LOG.error("Metadata indexing in progress: " + partitionPath);
+ return;
+ }
+ LOG.warn("Deleting Metadata Table partitions: " + partitionPath);
+ dataMetaClient.getFs().delete(new
Path(metadataWriteConfig.getBasePath(), partitionPath), true);
+ completedIndexes.remove(partitionPath);
+ }
+ // update table config
+
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_INDEX_COMPLETED.key(),
String.join(",", completedIndexes));
Review comment:
also, what incase the process crashes in between L684 and L688.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
##########
@@ -112,6 +114,25 @@ public HoodieDefaultTimeline getWriteTimeline() {
return new HoodieDefaultTimeline(instants.stream().filter(s ->
validActions.contains(s.getAction())), details);
}
+ @Override
+ public HoodieDefaultTimeline getContiguousCompletedWriteTimeline() {
Review comment:
can we do something like this
```
HoodieInstant earliestInflight =
getWriteTimeline().filterInflightsAndRequested().firstInstant().get().getTimestamp();
getWriteTimeline().filterCompletedInstants().filter( instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN,
earliestInflight));
```
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -915,6 +917,39 @@ public boolean scheduleCompactionAtInstant(String
instantTime, Option<Map<String
return scheduleTableService(instantTime, extraMetadata,
TableServiceType.COMPACT).isPresent();
}
+ public Option<String> scheduleIndexing(List<MetadataPartitionType>
partitionTypes) {
Review comment:
java docs please for all public apis
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -915,6 +917,39 @@ public boolean scheduleCompactionAtInstant(String
instantTime, Option<Map<String
return scheduleTableService(instantTime, extraMetadata,
TableServiceType.COMPACT).isPresent();
}
+ public Option<String> scheduleIndexing(List<MetadataPartitionType>
partitionTypes) {
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ return scheduleIndexingAtInstant(partitionTypes, instantTime) ?
Option.of(instantTime) : Option.empty();
+ }
+
+ private boolean scheduleIndexingAtInstant(List<MetadataPartitionType>
partitionTypes, String instantTime) throws HoodieIOException {
+ Option<HoodieIndexPlan> indexPlan = createTable(config, hadoopConf,
config.isMetadataTableEnabled())
Review comment:
what happens if someone tries to trigger indexing twice? I expect we
would fail the 2nd trigger conveying that already an indexing is in progress
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -645,12 +669,36 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
}
}
+ public void dropIndex(List<MetadataPartitionType> indexesToDrop) throws
IOException {
+ Set<String> completedIndexes =
Stream.of(dataMetaClient.getTableConfig().getCompletedMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet());
+ Set<String> inflightIndexes =
Stream.of(dataMetaClient.getTableConfig().getInflightMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet());
+ for (MetadataPartitionType partitionType : indexesToDrop) {
+ String partitionPath = partitionType.getPartitionPath();
+ if (inflightIndexes.contains(partitionPath)) {
+ LOG.error("Metadata indexing in progress: " + partitionPath);
Review comment:
can't we drop an index while its being built? what incase user wants to
abort the index building ?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -120,7 +120,7 @@ public HoodieBloomIndex(HoodieWriteConfig config,
BaseHoodieBloomIndexHelper blo
// Step 2: Load all involved files as <Partition, filename> pairs
List<Pair<String, BloomIndexFileInfo>> fileInfoList;
if (config.getBloomIndexPruneByRanges()) {
- fileInfoList = (config.getMetadataConfig().isColumnStatsIndexEnabled()
+ fileInfoList = (config.isMetadataColumnStatsIndexEnabled()
Review comment:
is it possible to enable just bloom index partition and disable col
stats ? should we check for both partitions here?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -645,12 +669,36 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
}
}
+ public void dropIndex(List<MetadataPartitionType> indexesToDrop) throws
IOException {
+ Set<String> completedIndexes =
Stream.of(dataMetaClient.getTableConfig().getCompletedMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet());
+ Set<String> inflightIndexes =
Stream.of(dataMetaClient.getTableConfig().getInflightMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet());
+ for (MetadataPartitionType partitionType : indexesToDrop) {
+ String partitionPath = partitionType.getPartitionPath();
+ if (inflightIndexes.contains(partitionPath)) {
+ LOG.error("Metadata indexing in progress: " + partitionPath);
+ return;
+ }
+ LOG.warn("Deleting Metadata Table partitions: " + partitionPath);
+ dataMetaClient.getFs().delete(new
Path(metadataWriteConfig.getBasePath(), partitionPath), true);
+ completedIndexes.remove(partitionPath);
+ }
+ // update table config
+
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_INDEX_COMPLETED.key(),
String.join(",", completedIndexes));
Review comment:
should we not first update the table config and then delete the
partitions. what incase at L 686, another writer checks table props and tries
to update the index. It will fail since we have deleted the partition at L 684.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
##########
@@ -208,6 +208,18 @@
.sinceVersion("0.11.0")
.withDocumentation("Table checksum is used to guard against partial
writes in HDFS. It is added as the last entry in hoodie.properties and then
used to validate while reading table config.");
+ public static final ConfigProperty<String> TABLE_METADATA_INDEX_INFLIGHT =
ConfigProperty
+ .key("hoodie.table.metadata.index.inflight")
+ .noDefaultValue()
+ .sinceVersion("0.11.0")
+ .withDocumentation("Comma-separated list of metadata partitions whose
indexing is in progress.");
+
+ public static final ConfigProperty<String> TABLE_METADATA_INDEX_COMPLETED =
ConfigProperty
+ .key("hoodie.table.metadata.index.completed")
Review comment:
same here
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -915,6 +917,39 @@ public boolean scheduleCompactionAtInstant(String
instantTime, Option<Map<String
return scheduleTableService(instantTime, extraMetadata,
TableServiceType.COMPACT).isPresent();
}
+ public Option<String> scheduleIndexing(List<MetadataPartitionType>
partitionTypes) {
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ return scheduleIndexingAtInstant(partitionTypes, instantTime) ?
Option.of(instantTime) : Option.empty();
+ }
+
+ private boolean scheduleIndexingAtInstant(List<MetadataPartitionType>
partitionTypes, String instantTime) throws HoodieIOException {
+ Option<HoodieIndexPlan> indexPlan = createTable(config, hadoopConf,
config.isMetadataTableEnabled())
+ .scheduleIndex(context, instantTime, partitionTypes);
+ return indexPlan.isPresent();
+ }
+
+ public Option<HoodieIndexCommitMetadata> index(String indexInstantTime) {
+ return createTable(config, hadoopConf,
config.isMetadataTableEnabled()).index(context, indexInstantTime);
+ }
+
+ public void dropIndex(List<MetadataPartitionType> partitionTypes) {
+ HoodieTable table = createTable(config, hadoopConf);
+ String dropInstant = HoodieActiveTimeline.createNewInstantTime();
+ this.txnManager.beginTransaction();
+ try {
+ context.setJobStatus(this.getClass().getSimpleName(), "Dropping
partitions from metadata table");
+ table.getMetadataWriter(dropInstant).ifPresent(w -> {
+ try {
+ ((HoodieTableMetadataWriter) w).dropIndex(partitionTypes);
+ } catch (IOException e) {
+ LOG.error("Failed to drop metadata index. ", e);
Review comment:
can we throw here.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -663,20 +711,82 @@ private MetadataRecordsGenerationParams
getRecordsGenerationParams() {
/**
* Processes commit metadata from data table and commits to metadata table.
+ *
* @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the
respective metadata to List of HoodieRecords to be written to metadata table.
* @param <T> type of commit metadata.
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap
= convertMetadataFunction.convertMetadata();
- commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ if (!dataWriteConfig.isMetadataTableEnabled()) {
+ return;
+ }
+ Set<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionRecordsMap = convertMetadataFunction.convertMetadata();
+ commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ }
+ });
+ }
+
+ private Set<String> getMetadataPartitionsToUpdate() {
+ // fetch partitions to update from table config
+ Set<String> partitionsToUpdate =
Stream.of(dataMetaClient.getTableConfig().getCompletedMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet());
+
partitionsToUpdate.addAll(Stream.of(dataMetaClient.getTableConfig().getInflightMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet()));
+ if (!partitionsToUpdate.isEmpty()) {
+ return partitionsToUpdate;
}
+ // fallback to update files partition only if table config returned no
partitions
+ partitionsToUpdate.add(MetadataPartitionType.FILES.getPartitionPath());
+ return partitionsToUpdate;
+ }
+
+ @Override
+ public void index(HoodieEngineContext engineContext,
List<HoodieIndexPartitionInfo> indexPartitionInfos) {
+ if (indexPartitionInfos.isEmpty()) {
+ LOG.warn("No partition to index in the plan");
+ return;
+ }
+ String indexUptoInstantTime =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ indexPartitionInfos.forEach(indexPartitionInfo -> {
+ String relativePartitionPath =
indexPartitionInfo.getMetadataPartitionPath();
+ LOG.info(String.format("Creating a new metadata index for partition '%s'
under path %s upto instant %s",
+ relativePartitionPath, metadataWriteConfig.getBasePath(),
indexUptoInstantTime));
+ try {
+ // filegroup should have already been initialized while scheduling
index for this partition
+ if (!dataMetaClient.getFs().exists(new
Path(metadataWriteConfig.getBasePath(), relativePartitionPath))) {
+ throw new HoodieIndexException(String.format("File group not
initialized for metadata partition: %s, indexUptoInstant: %s. Looks like index
scheduling failed!",
+ relativePartitionPath, indexUptoInstantTime));
+ }
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to check whether
file group is initialized for metadata partition: %s, indexUptoInstant: %s",
+ relativePartitionPath, indexUptoInstantTime));
+ }
+
+ // return early and populate enabledPartitionTypes correctly (check in
initialCommit)
+ MetadataPartitionType partitionType =
MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT));
+ if (!enabledPartitionTypes.contains(partitionType)) {
+ throw new HoodieIndexException(String.format("Indexing for metadata
partition: %s is not enabled", partitionType));
+ }
+ });
+ // before initial commit update table config
+
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_INDEX_INFLIGHT.key(),
indexPartitionInfos.stream()
+
.map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.joining(",")));
+ HoodieTableConfig.update(dataMetaClient.getFs(), new
Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
+ // check here for enabled partition types whether filegroups initialized
or not
+ initialCommit(indexUptoInstantTime);
+
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_INDEX_INFLIGHT.key(),
"");
Review comment:
lets use constant for empty string.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -663,20 +711,82 @@ private MetadataRecordsGenerationParams
getRecordsGenerationParams() {
/**
* Processes commit metadata from data table and commits to metadata table.
+ *
* @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the
respective metadata to List of HoodieRecords to be written to metadata table.
* @param <T> type of commit metadata.
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap
= convertMetadataFunction.convertMetadata();
- commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ if (!dataWriteConfig.isMetadataTableEnabled()) {
+ return;
+ }
+ Set<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionRecordsMap = convertMetadataFunction.convertMetadata();
+ commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ }
+ });
+ }
+
+ private Set<String> getMetadataPartitionsToUpdate() {
+ // fetch partitions to update from table config
+ Set<String> partitionsToUpdate =
Stream.of(dataMetaClient.getTableConfig().getCompletedMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet());
+
partitionsToUpdate.addAll(Stream.of(dataMetaClient.getTableConfig().getInflightMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet()));
+ if (!partitionsToUpdate.isEmpty()) {
+ return partitionsToUpdate;
}
+ // fallback to update files partition only if table config returned no
partitions
+ partitionsToUpdate.add(MetadataPartitionType.FILES.getPartitionPath());
+ return partitionsToUpdate;
+ }
+
+ @Override
+ public void index(HoodieEngineContext engineContext,
List<HoodieIndexPartitionInfo> indexPartitionInfos) {
+ if (indexPartitionInfos.isEmpty()) {
+ LOG.warn("No partition to index in the plan");
+ return;
+ }
+ String indexUptoInstantTime =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ indexPartitionInfos.forEach(indexPartitionInfo -> {
+ String relativePartitionPath =
indexPartitionInfo.getMetadataPartitionPath();
+ LOG.info(String.format("Creating a new metadata index for partition '%s'
under path %s upto instant %s",
+ relativePartitionPath, metadataWriteConfig.getBasePath(),
indexUptoInstantTime));
+ try {
+ // filegroup should have already been initialized while scheduling
index for this partition
+ if (!dataMetaClient.getFs().exists(new
Path(metadataWriteConfig.getBasePath(), relativePartitionPath))) {
+ throw new HoodieIndexException(String.format("File group not
initialized for metadata partition: %s, indexUptoInstant: %s. Looks like index
scheduling failed!",
+ relativePartitionPath, indexUptoInstantTime));
+ }
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to check whether
file group is initialized for metadata partition: %s, indexUptoInstant: %s",
+ relativePartitionPath, indexUptoInstantTime));
+ }
+
+ // return early and populate enabledPartitionTypes correctly (check in
initialCommit)
+ MetadataPartitionType partitionType =
MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT));
+ if (!enabledPartitionTypes.contains(partitionType)) {
+ throw new HoodieIndexException(String.format("Indexing for metadata
partition: %s is not enabled", partitionType));
+ }
+ });
+ // before initial commit update table config
+
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_INDEX_INFLIGHT.key(),
indexPartitionInfos.stream()
Review comment:
do you think we should append new entries to already existing value in
table config here?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.INDEX_ACTION;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+ private static final Logger LOG =
LogManager.getLogger(RunIndexActionExecutor.class);
+ private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION =
INDEX_COMMIT_METADATA_VERSION_1;
+ private static final int MAX_CONCURRENT_INDEXING = 1;
+
+ // we use this to update the latest instant in data timeline that has been
indexed in metadata table
+ // this needs to be volatile as it can be updated in the IndexingCheckTask
spawned by this executor
+ // assumption is that only one indexer can execute at a time
+ private volatile String currentIndexedInstant;
+
+ private final TransactionManager txnManager;
+
+ public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig
config, HoodieTable<T, I, K, O> table, String instantTime) {
+ super(context, config, table, instantTime);
+ this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
+ }
+
+ @Override
+ public Option<HoodieIndexCommitMetadata> execute() {
+ HoodieTimer indexTimer = new HoodieTimer();
+ indexTimer.startTimer();
+
+ // ensure lock provider configured
+ if
(!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() ||
StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
Review comment:
do we need to add this check else where for regular writers? I mean,
what incase user adds these configs just for the async indexer process, but
misses to add them to regular writers?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
##########
@@ -112,6 +114,25 @@ public HoodieDefaultTimeline getWriteTimeline() {
return new HoodieDefaultTimeline(instants.stream().filter(s ->
validActions.contains(s.getAction())), details);
}
+ @Override
+ public HoodieDefaultTimeline getContiguousCompletedWriteTimeline() {
Review comment:
do we have UTs for these
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
##########
@@ -208,6 +208,18 @@
.sinceVersion("0.11.0")
.withDocumentation("Table checksum is used to guard against partial
writes in HDFS. It is added as the last entry in hoodie.properties and then
used to validate while reading table config.");
+ public static final ConfigProperty<String> TABLE_METADATA_INDEX_INFLIGHT =
ConfigProperty
+ .key("hoodie.table.metadata.index.inflight")
Review comment:
should we make it plural.
...metadata.indexes.inflight
or
... metadata.indices.inflight
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -663,20 +711,82 @@ private MetadataRecordsGenerationParams
getRecordsGenerationParams() {
/**
* Processes commit metadata from data table and commits to metadata table.
+ *
* @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the
respective metadata to List of HoodieRecords to be written to metadata table.
* @param <T> type of commit metadata.
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap
= convertMetadataFunction.convertMetadata();
- commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ if (!dataWriteConfig.isMetadataTableEnabled()) {
+ return;
+ }
+ Set<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionRecordsMap = convertMetadataFunction.convertMetadata();
+ commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ }
+ });
+ }
+
+ private Set<String> getMetadataPartitionsToUpdate() {
+ // fetch partitions to update from table config
+ Set<String> partitionsToUpdate =
Stream.of(dataMetaClient.getTableConfig().getCompletedMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet());
+
partitionsToUpdate.addAll(Stream.of(dataMetaClient.getTableConfig().getInflightMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet()));
+ if (!partitionsToUpdate.isEmpty()) {
+ return partitionsToUpdate;
}
+ // fallback to update files partition only if table config returned no
partitions
+ partitionsToUpdate.add(MetadataPartitionType.FILES.getPartitionPath());
+ return partitionsToUpdate;
+ }
+
+ @Override
+ public void index(HoodieEngineContext engineContext,
List<HoodieIndexPartitionInfo> indexPartitionInfos) {
+ if (indexPartitionInfos.isEmpty()) {
+ LOG.warn("No partition to index in the plan");
+ return;
+ }
+ String indexUptoInstantTime =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ indexPartitionInfos.forEach(indexPartitionInfo -> {
+ String relativePartitionPath =
indexPartitionInfo.getMetadataPartitionPath();
+ LOG.info(String.format("Creating a new metadata index for partition '%s'
under path %s upto instant %s",
+ relativePartitionPath, metadataWriteConfig.getBasePath(),
indexUptoInstantTime));
+ try {
+ // filegroup should have already been initialized while scheduling
index for this partition
+ if (!dataMetaClient.getFs().exists(new
Path(metadataWriteConfig.getBasePath(), relativePartitionPath))) {
+ throw new HoodieIndexException(String.format("File group not
initialized for metadata partition: %s, indexUptoInstant: %s. Looks like index
scheduling failed!",
+ relativePartitionPath, indexUptoInstantTime));
+ }
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to check whether
file group is initialized for metadata partition: %s, indexUptoInstant: %s",
+ relativePartitionPath, indexUptoInstantTime));
+ }
+
+ // return early and populate enabledPartitionTypes correctly (check in
initialCommit)
+ MetadataPartitionType partitionType =
MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT));
+ if (!enabledPartitionTypes.contains(partitionType)) {
+ throw new HoodieIndexException(String.format("Indexing for metadata
partition: %s is not enabled", partitionType));
+ }
+ });
+ // before initial commit update table config
+
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_INDEX_INFLIGHT.key(),
indexPartitionInfos.stream()
+
.map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.joining(",")));
+ HoodieTableConfig.update(dataMetaClient.getFs(), new
Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
+ // check here for enabled partition types whether filegroups initialized
or not
+ initialCommit(indexUptoInstantTime);
+
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_INDEX_INFLIGHT.key(),
"");
+
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_INDEX_COMPLETED.key(),
indexPartitionInfos.stream()
Review comment:
not sure if this is the right place to update table config. I was
expecting we will update it towards the end after doing the catch up and
ensuring all commits are caught up. Lets sync up f2f on this. may be I am
missing something.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -175,6 +182,25 @@
.sinceVersion("0.11.0")
.withDocumentation("Parallelism to use, when generating column stats
index.");
+ public static final ConfigProperty<String> COLUMN_STATS_INDEX_FOR_COLUMNS =
ConfigProperty
+ .key(METADATA_PREFIX + ".index.column.stats.for.columns")
+ .defaultValue("")
+ .sinceVersion("0.11.0")
+ .withDocumentation("Comma-separated list of columns for which column
stats index will be built.");
Review comment:
can we enhance docs to say whats default behavior.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
+
+/**
+ * Schedules INDEX action.
+ * <li>
+ * 1. Fetch last completed instant on data timeline.
+ * 2. Write the index plan to the <instant>.index.requested.
+ * 3. Initialize filegroups for the enabled partition types within a
transaction.
+ * </li>
+ */
+public class ScheduleIndexActionExecutor<T extends HoodieRecordPayload, I, K,
O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexPlan>> {
+
+ private static final Logger LOG =
LogManager.getLogger(ScheduleIndexActionExecutor.class);
+ private static final Integer INDEX_PLAN_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_PLAN_VERSION =
INDEX_PLAN_VERSION_1;
+
+ private final List<MetadataPartitionType> partitionsToIndex;
+ private final TransactionManager txnManager;
+
+ public ScheduleIndexActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<T, I, K, O> table,
+ String instantTime,
+ List<MetadataPartitionType>
partitionsToIndex) {
+ super(context, config, table, instantTime);
+ this.partitionsToIndex = partitionsToIndex;
+ this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
+ }
+
+ @Override
+ public Option<HoodieIndexPlan> execute() {
+ // validate partitionsToIndex
+ if
(!EnumSet.allOf(MetadataPartitionType.class).containsAll(partitionsToIndex)) {
+ throw new HoodieIndexException("Not all partitions are valid: " +
partitionsToIndex);
+ }
+ // ensure lock provider configured
+ if
(!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() ||
StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
+ throw new HoodieIndexException(String.format("Need to set %s as %s and
configure lock provider class",
+ WRITE_CONCURRENCY_MODE.key(),
OPTIMISTIC_CONCURRENCY_CONTROL.name()));
+ }
+ // make sure that it is idempotent, check with previously pending index
operations.
+ Set<String> indexesInflightOrCompleted =
Stream.of(table.getMetaClient().getTableConfig().getInflightMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet());
+
indexesInflightOrCompleted.addAll(Stream.of(table.getMetaClient().getTableConfig().getInflightMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet()));
+ Set<String> requestedPartitions =
partitionsToIndex.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
+ requestedPartitions.retainAll(indexesInflightOrCompleted);
+ if (!requestedPartitions.isEmpty()) {
+ LOG.error("Following partitions already exist or inflight: " +
requestedPartitions);
+ return Option.empty();
+ }
+ // get last completed instant
+ Option<HoodieInstant> indexUptoInstant =
table.getActiveTimeline().getContiguousCompletedWriteTimeline().lastInstant();
+ if (indexUptoInstant.isPresent()) {
+ final HoodieInstant indexInstant =
HoodieTimeline.getIndexRequestedInstant(instantTime);
+ // for each partitionToIndex add that time to the plan
+ List<HoodieIndexPartitionInfo> indexPartitionInfos =
partitionsToIndex.stream()
+ .map(p -> new HoodieIndexPartitionInfo(LATEST_INDEX_PLAN_VERSION,
p.getPartitionPath(), indexUptoInstant.get().getTimestamp()))
+ .collect(Collectors.toList());
+ HoodieIndexPlan indexPlan = new
HoodieIndexPlan(LATEST_INDEX_PLAN_VERSION, indexPartitionInfos);
+ try {
+ table.getActiveTimeline().saveToPendingIndexCommit(indexInstant,
TimelineMetadataUtils.serializeIndexPlan(indexPlan));
+ } catch (IOException e) {
+ LOG.error("Error while saving index requested file", e);
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ table.getMetaClient().reloadActiveTimeline();
+
+ // start initializing filegroups
+ // 1. get metadata writer
+ HoodieTableMetadataWriter metadataWriter =
table.getMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format("Could not
get metadata writer to initialize filegroups for indexing for instant: %s",
instantTime)));
+ // 2. take a lock --> begin tx (data table)
+ try {
+ this.txnManager.beginTransaction(Option.of(indexInstant),
Option.empty());
+ // 3. initialize filegroups as per plan for the enabled partition types
+ metadataWriter.scheduleIndex(table.getMetaClient(), partitionsToIndex,
indexInstant.getTimestamp());
+ } catch (IOException e) {
+ LOG.error("Could not initialize file groups");
Review comment:
can we suffix exception to the error log.
LOG.error("Could not initialize file groups", e);
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
+
+/**
+ * Schedules INDEX action.
+ * <li>
+ * 1. Fetch last completed instant on data timeline.
+ * 2. Write the index plan to the <instant>.index.requested.
+ * 3. Initialize filegroups for the enabled partition types within a
transaction.
+ * </li>
+ */
+public class ScheduleIndexActionExecutor<T extends HoodieRecordPayload, I, K,
O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexPlan>> {
+
+ private static final Logger LOG =
LogManager.getLogger(ScheduleIndexActionExecutor.class);
+ private static final Integer INDEX_PLAN_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_PLAN_VERSION =
INDEX_PLAN_VERSION_1;
+
+ private final List<MetadataPartitionType> partitionsToIndex;
+ private final TransactionManager txnManager;
+
+ public ScheduleIndexActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<T, I, K, O> table,
+ String instantTime,
+ List<MetadataPartitionType>
partitionsToIndex) {
+ super(context, config, table, instantTime);
+ this.partitionsToIndex = partitionsToIndex;
+ this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
+ }
+
+ @Override
+ public Option<HoodieIndexPlan> execute() {
+ // validate partitionsToIndex
+ if
(!EnumSet.allOf(MetadataPartitionType.class).containsAll(partitionsToIndex)) {
+ throw new HoodieIndexException("Not all partitions are valid: " +
partitionsToIndex);
+ }
+ // ensure lock provider configured
+ if
(!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() ||
StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
+ throw new HoodieIndexException(String.format("Need to set %s as %s and
configure lock provider class",
+ WRITE_CONCURRENCY_MODE.key(),
OPTIMISTIC_CONCURRENCY_CONTROL.name()));
+ }
+ // make sure that it is idempotent, check with previously pending index
operations.
+ Set<String> indexesInflightOrCompleted =
Stream.of(table.getMetaClient().getTableConfig().getInflightMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet());
+
indexesInflightOrCompleted.addAll(Stream.of(table.getMetaClient().getTableConfig().getInflightMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet()));
+ Set<String> requestedPartitions =
partitionsToIndex.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
+ requestedPartitions.retainAll(indexesInflightOrCompleted);
+ if (!requestedPartitions.isEmpty()) {
+ LOG.error("Following partitions already exist or inflight: " +
requestedPartitions);
+ return Option.empty();
+ }
+ // get last completed instant
+ Option<HoodieInstant> indexUptoInstant =
table.getActiveTimeline().getContiguousCompletedWriteTimeline().lastInstant();
+ if (indexUptoInstant.isPresent()) {
+ final HoodieInstant indexInstant =
HoodieTimeline.getIndexRequestedInstant(instantTime);
+ // for each partitionToIndex add that time to the plan
+ List<HoodieIndexPartitionInfo> indexPartitionInfos =
partitionsToIndex.stream()
+ .map(p -> new HoodieIndexPartitionInfo(LATEST_INDEX_PLAN_VERSION,
p.getPartitionPath(), indexUptoInstant.get().getTimestamp()))
+ .collect(Collectors.toList());
+ HoodieIndexPlan indexPlan = new
HoodieIndexPlan(LATEST_INDEX_PLAN_VERSION, indexPartitionInfos);
+ try {
+ table.getActiveTimeline().saveToPendingIndexCommit(indexInstant,
TimelineMetadataUtils.serializeIndexPlan(indexPlan));
+ } catch (IOException e) {
+ LOG.error("Error while saving index requested file", e);
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ table.getMetaClient().reloadActiveTimeline();
+
+ // start initializing filegroups
+ // 1. get metadata writer
+ HoodieTableMetadataWriter metadataWriter =
table.getMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format("Could not
get metadata writer to initialize filegroups for indexing for instant: %s",
instantTime)));
+ // 2. take a lock --> begin tx (data table)
+ try {
+ this.txnManager.beginTransaction(Option.of(indexInstant),
Option.empty());
+ // 3. initialize filegroups as per plan for the enabled partition types
+ metadataWriter.scheduleIndex(table.getMetaClient(), partitionsToIndex,
indexInstant.getTimestamp());
Review comment:
I thought we discussed that file groups should be initialized before
adding the requested instant to timeline. So that, whenever any writer sees a
new index being built, the file groups are already built out. but here I see we
create the requested meta file in L 112 and then initialize file groups here.
can you help me understand please
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.INDEX_ACTION;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+ private static final Logger LOG =
LogManager.getLogger(RunIndexActionExecutor.class);
+ private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION =
INDEX_COMMIT_METADATA_VERSION_1;
+ private static final int MAX_CONCURRENT_INDEXING = 1;
+
+ // we use this to update the latest instant in data timeline that has been
indexed in metadata table
+ // this needs to be volatile as it can be updated in the IndexingCheckTask
spawned by this executor
+ // assumption is that only one indexer can execute at a time
+ private volatile String currentIndexedInstant;
+
+ private final TransactionManager txnManager;
+
+ public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig
config, HoodieTable<T, I, K, O> table, String instantTime) {
+ super(context, config, table, instantTime);
+ this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
+ }
+
+ @Override
+ public Option<HoodieIndexCommitMetadata> execute() {
+ HoodieTimer indexTimer = new HoodieTimer();
+ indexTimer.startTimer();
+
+ // ensure lock provider configured
+ if
(!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() ||
StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
+ throw new HoodieIndexException(String.format("Need to set %s as %s and
configure lock provider class",
+ WRITE_CONCURRENCY_MODE.key(),
OPTIMISTIC_CONCURRENCY_CONTROL.name()));
+ }
+
+ HoodieInstant indexInstant = table.getActiveTimeline()
+ .filterPendingIndexTimeline()
+ .filter(instant -> instant.getTimestamp().equals(instantTime) &&
REQUESTED.equals(instant.getState()))
+ .lastInstant()
+ .orElseThrow(() -> new HoodieIndexException(String.format("No
requested index instant found: %s", instantTime)));
+ try {
+ // read HoodieIndexPlan
+ HoodieIndexPlan indexPlan =
TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+ List<HoodieIndexPartitionInfo> indexPartitionInfos =
indexPlan.getIndexPartitionInfos();
+ if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+ throw new HoodieIndexException(String.format("No partitions to index
for instant: %s", instantTime));
+ }
+ // transition requested indexInstant to inflight
+
table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant,
Option.empty());
+ // start indexing for each partition
+ HoodieTableMetadataWriter metadataWriter =
table.getMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format("Could not
get metadata writer to run index action for instant: %s", instantTime)));
+ metadataWriter.index(context, indexPartitionInfos);
+
+ // get all instants since the plan completed (both from active timeline
and archived timeline)
+ // assumption is that all metadata partitions had same instant upto
which they were scheduled to be indexed
+ table.getMetaClient().reloadActiveTimeline();
+ String indexUptoInstant =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ List<HoodieInstant> instantsToIndex =
getRemainingArchivedAndActiveInstantsSince(indexUptoInstant,
table.getMetaClient());
+
+ // reconcile with metadata table timeline
+ String metadataBasePath =
HoodieTableMetadata.getMetadataTableBasePath(table.getMetaClient().getBasePath());
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
+ Set<String> metadataCompletedTimestamps =
getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant,
metadataMetaClient).stream()
+ .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+
+ // index all remaining instants with a timeout
+ currentIndexedInstant = indexUptoInstant;
+ ExecutorService executorService =
Executors.newFixedThreadPool(MAX_CONCURRENT_INDEXING);
+ Future<?> postRequestIndexingTaskFuture = executorService.submit(
+ new IndexingCheckTask(metadataWriter, instantsToIndex,
metadataCompletedTimestamps, table.getMetaClient()));
+ try {
+ postRequestIndexingTaskFuture.get(config.getIndexingCheckTimeout(),
TimeUnit.SECONDS);
Review comment:
lets name the getter with units.
config.getIndexingCheckTimeoutSecs()
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.MetadataPartitionType;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.jetbrains.annotations.TestOnly;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+/**
+ * A tool to run metadata indexing asynchronously.
+ * <p>
+ * Example command (assuming indexer.properties contains related index
configs, see {@link org.apache.hudi.common.config.HoodieMetadataConfig} for
configs):
+ * <p>
+ * spark-submit \
+ * --class org.apache.hudi.utilities.HoodieIndexer \
+ *
/path/to/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.11.0-SNAPSHOT.jar
\
+ * --props /path/to/indexer.properties \
+ * --mode scheduleAndExecute \
+ * --base-path /tmp/hudi_trips_cow \
+ * --table-name hudi_trips_cow \
+ * --index-types COLUMN_STATS \
+ * --parallelism 1 \
+ * --spark-memory 1g
+ * <p>
+ * A sample indexer.properties file:
+ * <p>
+ * hoodie.metadata.index.async=true
+ * hoodie.metadata.index.column.stats.enable=true
+ * hoodie.metadata.index.check.timeout.seconds=60
+ * hoodie.write.concurrency.mode=optimistic_concurrency_control
+ *
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
+ */
+public class HoodieIndexer {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+ private static final String DROP_INDEX = "dropindex";
+
+ private final HoodieIndexer.Config cfg;
+ private TypedProperties props;
+ private final JavaSparkContext jsc;
+ private final HoodieTableMetaClient metaClient;
+
+ public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+ this.cfg = cfg;
+ this.jsc = jsc;
+ this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+ ? UtilHelpers.buildProperties(cfg.configs)
+ : readConfigFromFileSystem(jsc, cfg);
+ this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+ }
+
+ private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
HoodieIndexer.Config cfg) {
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
Path(cfg.propsFilePath), cfg.configs)
+ .getProps(true);
+ }
+
+ public static class Config implements Serializable {
+ @Parameter(names = {"--base-path", "-sp"}, description = "Base path for
the table", required = true)
+ public String basePath = null;
+ @Parameter(names = {"--table-name", "-tn"}, description = "Table name",
required = true)
+ public String tableName = null;
+ @Parameter(names = {"--instant-time", "-it"}, description = "Indexing
Instant time")
+ public String indexInstantTime = null;
+ @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism
for hoodie insert", required = true)
+ public int parallelism = 1;
+ @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+ public String sparkMaster = null;
+ @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory
to use", required = true)
+ public String sparkMemory = null;
+ @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+ public int retry = 0;
+ @Parameter(names = {"--index-types", "-ixt"}, description =
"Comma-separated index types to be built, e.g. BLOOM_FILTERS,COLUMN_STATS",
required = true)
+ public String indexTypes = null;
+ @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set
\"schedule\" to generate an indexing plan; "
+ + "Set \"execute\" to execute the indexing plan at the given instant,
which means --instant-time is required here; "
+ + "Set \"scheduleandExecute\" to generate an indexing plan first and
execute that plan immediately;"
+ + "Set \"dropindex\" to drop the index types specified in
--index-types;")
+ public String runningMode = null;
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+ @Parameter(names = {"--props"}, description = "path to properties file on
localfs or dfs, with configurations for hoodie client for indexing")
+ public String propsFilePath = null;
+
+ @Parameter(names = {"--hoodie-conf"}, description = "Any configuration
that can be set in the properties file "
+ + "(using the CLI parameter \"--props\") can also be passed command
line using this parameter. This can be repeated",
+ splitter = IdentitySplitter.class)
+ public List<String> configs = new ArrayList<>();
+ }
+
+ public static void main(String[] args) {
+ final HoodieIndexer.Config cfg = new HoodieIndexer.Config();
+ JCommander cmd = new JCommander(cfg, null, args);
+
+ if (cfg.help || args.length == 0) {
+ cmd.usage();
+ System.exit(1);
+ }
+
+ final JavaSparkContext jsc = UtilHelpers.buildSparkContext("indexing-" +
cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
+ HoodieIndexer indexer = new HoodieIndexer(jsc, cfg);
+ int result = indexer.start(cfg.retry);
+ String resultMsg = String.format("Indexing with basePath: %s, tableName:
%s, runningMode: %s",
+ cfg.basePath, cfg.tableName, cfg.runningMode);
+ if (result == -1) {
+ LOG.error(resultMsg + " failed");
+ } else {
+ LOG.info(resultMsg + " success");
+ }
+ jsc.stop();
+ }
+
+ public int start(int retry) {
+ return UtilHelpers.retry(retry, () -> {
+ switch (cfg.runningMode.toLowerCase()) {
+ case SCHEDULE: {
+ LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
+ Option<String> instantTime = scheduleIndexing(jsc);
+ int result = instantTime.isPresent() ? 0 : -1;
+ if (result == 0) {
+ LOG.info("The schedule instant time is " + instantTime.get());
+ }
+ return result;
+ }
+ case SCHEDULE_AND_EXECUTE: {
+ LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
+ return scheduleAndRunIndexing(jsc);
+ }
+ case EXECUTE: {
+ LOG.info("Running Mode: [" + EXECUTE + "];");
+ return runIndexing(jsc);
+ }
+ case DROP_INDEX: {
+ LOG.info("Running Mode: [" + DROP_INDEX + "];");
+ return dropIndex(jsc);
+ }
+ default: {
+ LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit
the job directly");
+ return -1;
+ }
+ }
+ }, "Indexer failed");
+ }
+
+ @TestOnly
+ public Option<String> doSchedule() throws Exception {
+ return this.scheduleIndexing(jsc);
+ }
+
+ private Option<String> scheduleIndexing(JavaSparkContext jsc) throws
Exception {
+ String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+ try (SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Option.empty(), props)) {
+ return doSchedule(client);
+ }
+ }
+
+ private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload>
client) {
+ List<String> partitionsToIndex = Arrays.asList(cfg.indexTypes.split(","));
+ List<MetadataPartitionType> partitionTypes = partitionsToIndex.stream()
+ .map(p -> MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)))
+ .collect(Collectors.toList());
+ if (cfg.indexInstantTime != null) {
+ client.scheduleClusteringAtInstant(cfg.indexInstantTime, Option.empty());
+ return Option.of(cfg.indexInstantTime);
+ }
+ Option<String> indexingInstant = client.scheduleIndexing(partitionTypes);
+ if (!indexingInstant.isPresent()) {
+ LOG.error("Scheduling of index action did not return any instant.");
+ }
+ return indexingInstant;
+ }
+
+ private int runIndexing(JavaSparkContext jsc) throws Exception {
+ String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+ try (SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Option.empty(), props)) {
+ if (StringUtils.isNullOrEmpty(cfg.indexInstantTime)) {
+ // Instant time is not specified
+ // Find the earliest scheduled indexing instant for execution
+ Option<HoodieInstant> earliestPendingIndexInstant =
metaClient.getActiveTimeline()
Review comment:
again. what incase there are two processes which scheduled index
building for two diff partitions. I feel runIndexing should take in the list of
partitions to be built and fetch the earliest instant pertaining to that ?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -621,8 +635,14 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
LOG.info(String.format("Creating %d file groups for partition %s with base
fileId %s at instant time %s",
fileGroupCount, metadataPartition.getPartitionPath(),
metadataPartition.getFileIdPrefix(), instantTime));
+ HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemView(metadataMetaClient);
+ List<FileSlice> fileSlices =
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
Option.ofNullable(fsView), metadataPartition.getPartitionPath());
for (int i = 0; i < fileGroupCount; ++i) {
final String fileGroupFileId = String.format("%s%04d",
metadataPartition.getFileIdPrefix(), i);
+ // if a writer or async indexer had already initialized the filegroup
then continue
+ if (!fileSlices.isEmpty() && fileSlices.stream().anyMatch(fileSlice ->
fileGroupFileId.equals(fileSlice.getFileGroupId().getFileId()))) {
Review comment:
with latest code, I guess fileGroups are initialized by the index
schedule. So, regular writers by the time they see a new index being built,
file groups should have been instantiated. given this, can you help me
understand how this scenario could be possible i.e. already someone initialized
a file group, but this process again calls initializeFileGroups. Ideally
initializeFileGroups should be called just once per MDT partition right ?or am
I missing something.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.MetadataPartitionType;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.jetbrains.annotations.TestOnly;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+/**
+ * A tool to run metadata indexing asynchronously.
+ * <p>
+ * Example command (assuming indexer.properties contains related index
configs, see {@link org.apache.hudi.common.config.HoodieMetadataConfig} for
configs):
+ * <p>
+ * spark-submit \
+ * --class org.apache.hudi.utilities.HoodieIndexer \
+ *
/path/to/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.11.0-SNAPSHOT.jar
\
+ * --props /path/to/indexer.properties \
+ * --mode scheduleAndExecute \
+ * --base-path /tmp/hudi_trips_cow \
+ * --table-name hudi_trips_cow \
+ * --index-types COLUMN_STATS \
+ * --parallelism 1 \
+ * --spark-memory 1g
+ * <p>
+ * A sample indexer.properties file:
+ * <p>
+ * hoodie.metadata.index.async=true
+ * hoodie.metadata.index.column.stats.enable=true
+ * hoodie.metadata.index.check.timeout.seconds=60
+ * hoodie.write.concurrency.mode=optimistic_concurrency_control
+ *
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
+ */
+public class HoodieIndexer {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+ private static final String DROP_INDEX = "dropindex";
+
+ private final HoodieIndexer.Config cfg;
+ private TypedProperties props;
+ private final JavaSparkContext jsc;
+ private final HoodieTableMetaClient metaClient;
+
+ public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+ this.cfg = cfg;
+ this.jsc = jsc;
+ this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+ ? UtilHelpers.buildProperties(cfg.configs)
+ : readConfigFromFileSystem(jsc, cfg);
+ this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+ }
+
+ private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
HoodieIndexer.Config cfg) {
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
Path(cfg.propsFilePath), cfg.configs)
+ .getProps(true);
+ }
+
+ public static class Config implements Serializable {
+ @Parameter(names = {"--base-path", "-sp"}, description = "Base path for
the table", required = true)
+ public String basePath = null;
+ @Parameter(names = {"--table-name", "-tn"}, description = "Table name",
required = true)
+ public String tableName = null;
+ @Parameter(names = {"--instant-time", "-it"}, description = "Indexing
Instant time")
+ public String indexInstantTime = null;
+ @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism
for hoodie insert", required = true)
+ public int parallelism = 1;
+ @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+ public String sparkMaster = null;
+ @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory
to use", required = true)
+ public String sparkMemory = null;
+ @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+ public int retry = 0;
+ @Parameter(names = {"--index-types", "-ixt"}, description =
"Comma-separated index types to be built, e.g. BLOOM_FILTERS,COLUMN_STATS",
required = true)
+ public String indexTypes = null;
+ @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set
\"schedule\" to generate an indexing plan; "
+ + "Set \"execute\" to execute the indexing plan at the given instant,
which means --instant-time is required here; "
+ + "Set \"scheduleandExecute\" to generate an indexing plan first and
execute that plan immediately;"
+ + "Set \"dropindex\" to drop the index types specified in
--index-types;")
+ public String runningMode = null;
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+ @Parameter(names = {"--props"}, description = "path to properties file on
localfs or dfs, with configurations for hoodie client for indexing")
+ public String propsFilePath = null;
+
+ @Parameter(names = {"--hoodie-conf"}, description = "Any configuration
that can be set in the properties file "
+ + "(using the CLI parameter \"--props\") can also be passed command
line using this parameter. This can be repeated",
+ splitter = IdentitySplitter.class)
+ public List<String> configs = new ArrayList<>();
+ }
+
+ public static void main(String[] args) {
+ final HoodieIndexer.Config cfg = new HoodieIndexer.Config();
+ JCommander cmd = new JCommander(cfg, null, args);
+
+ if (cfg.help || args.length == 0) {
+ cmd.usage();
+ System.exit(1);
+ }
+
+ final JavaSparkContext jsc = UtilHelpers.buildSparkContext("indexing-" +
cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
+ HoodieIndexer indexer = new HoodieIndexer(jsc, cfg);
+ int result = indexer.start(cfg.retry);
+ String resultMsg = String.format("Indexing with basePath: %s, tableName:
%s, runningMode: %s",
+ cfg.basePath, cfg.tableName, cfg.runningMode);
+ if (result == -1) {
+ LOG.error(resultMsg + " failed");
+ } else {
+ LOG.info(resultMsg + " success");
+ }
+ jsc.stop();
+ }
+
+ public int start(int retry) {
+ return UtilHelpers.retry(retry, () -> {
+ switch (cfg.runningMode.toLowerCase()) {
+ case SCHEDULE: {
+ LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
+ Option<String> instantTime = scheduleIndexing(jsc);
+ int result = instantTime.isPresent() ? 0 : -1;
+ if (result == 0) {
+ LOG.info("The schedule instant time is " + instantTime.get());
+ }
+ return result;
+ }
+ case SCHEDULE_AND_EXECUTE: {
+ LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
+ return scheduleAndRunIndexing(jsc);
+ }
+ case EXECUTE: {
+ LOG.info("Running Mode: [" + EXECUTE + "];");
+ return runIndexing(jsc);
+ }
+ case DROP_INDEX: {
+ LOG.info("Running Mode: [" + DROP_INDEX + "];");
+ return dropIndex(jsc);
+ }
+ default: {
+ LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit
the job directly");
+ return -1;
+ }
+ }
+ }, "Indexer failed");
+ }
+
+ @TestOnly
+ public Option<String> doSchedule() throws Exception {
+ return this.scheduleIndexing(jsc);
+ }
+
+ private Option<String> scheduleIndexing(JavaSparkContext jsc) throws
Exception {
+ String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+ try (SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Option.empty(), props)) {
+ return doSchedule(client);
+ }
+ }
+
+ private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload>
client) {
+ List<String> partitionsToIndex = Arrays.asList(cfg.indexTypes.split(","));
+ List<MetadataPartitionType> partitionTypes = partitionsToIndex.stream()
+ .map(p -> MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)))
+ .collect(Collectors.toList());
+ if (cfg.indexInstantTime != null) {
Review comment:
!StringUtils.isNullOrEmpty()
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.INDEX_ACTION;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+ private static final Logger LOG =
LogManager.getLogger(RunIndexActionExecutor.class);
+ private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION =
INDEX_COMMIT_METADATA_VERSION_1;
+ private static final int MAX_CONCURRENT_INDEXING = 1;
+
+ // we use this to update the latest instant in data timeline that has been
indexed in metadata table
+ // this needs to be volatile as it can be updated in the IndexingCheckTask
spawned by this executor
+ // assumption is that only one indexer can execute at a time
+ private volatile String currentIndexedInstant;
+
+ private final TransactionManager txnManager;
+
+ public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig
config, HoodieTable<T, I, K, O> table, String instantTime) {
+ super(context, config, table, instantTime);
+ this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
+ }
+
+ @Override
+ public Option<HoodieIndexCommitMetadata> execute() {
+ HoodieTimer indexTimer = new HoodieTimer();
+ indexTimer.startTimer();
+
+ // ensure lock provider configured
+ if
(!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() ||
StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
+ throw new HoodieIndexException(String.format("Need to set %s as %s and
configure lock provider class",
+ WRITE_CONCURRENCY_MODE.key(),
OPTIMISTIC_CONCURRENCY_CONTROL.name()));
+ }
+
+ HoodieInstant indexInstant = table.getActiveTimeline()
+ .filterPendingIndexTimeline()
+ .filter(instant -> instant.getTimestamp().equals(instantTime) &&
REQUESTED.equals(instant.getState()))
+ .lastInstant()
+ .orElseThrow(() -> new HoodieIndexException(String.format("No
requested index instant found: %s", instantTime)));
+ try {
+ // read HoodieIndexPlan
+ HoodieIndexPlan indexPlan =
TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+ List<HoodieIndexPartitionInfo> indexPartitionInfos =
indexPlan.getIndexPartitionInfos();
+ if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+ throw new HoodieIndexException(String.format("No partitions to index
for instant: %s", instantTime));
+ }
+ // transition requested indexInstant to inflight
+
table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant,
Option.empty());
+ // start indexing for each partition
+ HoodieTableMetadataWriter metadataWriter =
table.getMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format("Could not
get metadata writer to run index action for instant: %s", instantTime)));
+ metadataWriter.index(context, indexPartitionInfos);
+
+ // get all instants since the plan completed (both from active timeline
and archived timeline)
+ // assumption is that all metadata partitions had same instant upto
which they were scheduled to be indexed
+ table.getMetaClient().reloadActiveTimeline();
+ String indexUptoInstant =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ List<HoodieInstant> instantsToIndex =
getRemainingArchivedAndActiveInstantsSince(indexUptoInstant,
table.getMetaClient());
+
+ // reconcile with metadata table timeline
+ String metadataBasePath =
HoodieTableMetadata.getMetadataTableBasePath(table.getMetaClient().getBasePath());
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
+ Set<String> metadataCompletedTimestamps =
getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant,
metadataMetaClient).stream()
+ .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+
+ // index all remaining instants with a timeout
+ currentIndexedInstant = indexUptoInstant;
+ ExecutorService executorService =
Executors.newFixedThreadPool(MAX_CONCURRENT_INDEXING);
+ Future<?> postRequestIndexingTaskFuture = executorService.submit(
+ new IndexingCheckTask(metadataWriter, instantsToIndex,
metadataCompletedTimestamps, table.getMetaClient()));
+ try {
+ postRequestIndexingTaskFuture.get(config.getIndexingCheckTimeout(),
TimeUnit.SECONDS);
+ } catch (TimeoutException | InterruptedException | ExecutionException e)
{
+ postRequestIndexingTaskFuture.cancel(true);
+ } finally {
+ executorService.shutdownNow();
+ }
+ // save index commit metadata and return
+ List<HoodieIndexPartitionInfo> finalIndexPartitionInfos =
indexPartitionInfos.stream()
+ .map(info -> new HoodieIndexPartitionInfo(
+ info.getVersion(),
+ info.getMetadataPartitionPath(),
+ currentIndexedInstant))
+ .collect(Collectors.toList());
+ HoodieIndexCommitMetadata indexCommitMetadata =
HoodieIndexCommitMetadata.newBuilder()
+
.setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(finalIndexPartitionInfos).build();
+ try {
+ txnManager.beginTransaction();
+ table.getActiveTimeline().saveAsComplete(
+ new HoodieInstant(true, INDEX_ACTION, indexInstant.getTimestamp()),
+
TimelineMetadataUtils.serializeIndexCommitMetadata(indexCommitMetadata));
+ } finally {
+ txnManager.endTransaction();
+ }
+ return Option.of(indexCommitMetadata);
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to index instant:
%s", indexInstant));
+ }
+ }
+
+ private static List<HoodieInstant>
getRemainingArchivedAndActiveInstantsSince(String instant,
HoodieTableMetaClient metaClient) {
+ List<HoodieInstant> remainingInstantsToIndex =
metaClient.getArchivedTimeline()
+ .getWriteTimeline()
+ .findInstantsAfter(instant)
+ .getInstants().collect(Collectors.toList());
+
remainingInstantsToIndex.addAll(metaClient.getActiveTimeline().getWriteTimeline().findInstantsAfter(instant).getInstants().collect(Collectors.toList()));
+ return remainingInstantsToIndex;
+ }
+
+ private static List<HoodieInstant>
getCompletedArchivedAndActiveInstantsAfter(String instant,
HoodieTableMetaClient metaClient) {
+ List<HoodieInstant> completedInstants = metaClient.getArchivedTimeline()
+ .filterCompletedInstants()
+ .findInstantsAfter(instant)
+ .getInstants().collect(Collectors.toList());
+
completedInstants.addAll(metaClient.getActiveTimeline().filterCompletedInstants().findInstantsAfter(instant).getInstants().collect(Collectors.toList()));
+ return completedInstants;
+ }
+
+ /**
+ * Indexing check runs for instants that completed after the base instant
(in the index plan).
+ * It will check if these later instants have logged updates to metadata
table or not.
+ * If not, then it will do the update. If a later instant is inflight, it
will wait until it is completed or the task times out.
+ */
+ class IndexingCheckTask implements Runnable {
+
+ private final HoodieTableMetadataWriter metadataWriter;
+ private final List<HoodieInstant> instantsToIndex;
+ private final Set<String> metadataCompletedInstants;
+ private final HoodieTableMetaClient metaClient;
+
+ IndexingCheckTask(HoodieTableMetadataWriter metadataWriter,
+ List<HoodieInstant> instantsToIndex,
+ Set<String> metadataCompletedInstants,
+ HoodieTableMetaClient metaClient) {
+ this.metadataWriter = metadataWriter;
+ this.instantsToIndex = instantsToIndex;
+ this.metadataCompletedInstants = metadataCompletedInstants;
+ this.metaClient = metaClient;
+ }
+
+ @Override
+ public void run() {
+ while (!Thread.interrupted()) {
+ for (HoodieInstant instant : instantsToIndex) {
+ // metadata index already updated for this instant
+ if (metadataCompletedInstants.contains(instant.getTimestamp())) {
+ currentIndexedInstant = instant.getTimestamp();
+ continue;
+ }
+ while (!instant.isCompleted()) {
+ // reload timeline and fetch instant details again wait until
timeout
Review comment:
may I know where are we waiting ?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
##########
@@ -19,17 +19,28 @@
package org.apache.hudi.metadata;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import java.io.IOException;
import java.io.Serializable;
+import java.util.List;
/**
* Interface that supports updating metadata for a given table, as actions
complete.
*/
public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable
{
+ void index(HoodieEngineContext engineContext, List<HoodieIndexPartitionInfo>
indexPartitionInfos);
Review comment:
how about "buildIndex" instead of "index"
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.INDEX_ACTION;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+ private static final Logger LOG =
LogManager.getLogger(RunIndexActionExecutor.class);
+ private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION =
INDEX_COMMIT_METADATA_VERSION_1;
+ private static final int MAX_CONCURRENT_INDEXING = 1;
+
+ // we use this to update the latest instant in data timeline that has been
indexed in metadata table
+ // this needs to be volatile as it can be updated in the IndexingCheckTask
spawned by this executor
+ // assumption is that only one indexer can execute at a time
+ private volatile String currentIndexedInstant;
+
+ private final TransactionManager txnManager;
+
+ public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig
config, HoodieTable<T, I, K, O> table, String instantTime) {
+ super(context, config, table, instantTime);
+ this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
+ }
+
+ @Override
+ public Option<HoodieIndexCommitMetadata> execute() {
+ HoodieTimer indexTimer = new HoodieTimer();
+ indexTimer.startTimer();
+
+ // ensure lock provider configured
+ if
(!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() ||
StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
+ throw new HoodieIndexException(String.format("Need to set %s as %s and
configure lock provider class",
+ WRITE_CONCURRENCY_MODE.key(),
OPTIMISTIC_CONCURRENCY_CONTROL.name()));
+ }
+
+ HoodieInstant indexInstant = table.getActiveTimeline()
+ .filterPendingIndexTimeline()
+ .filter(instant -> instant.getTimestamp().equals(instantTime) &&
REQUESTED.equals(instant.getState()))
+ .lastInstant()
+ .orElseThrow(() -> new HoodieIndexException(String.format("No
requested index instant found: %s", instantTime)));
+ try {
+ // read HoodieIndexPlan
+ HoodieIndexPlan indexPlan =
TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+ List<HoodieIndexPartitionInfo> indexPartitionInfos =
indexPlan.getIndexPartitionInfos();
+ if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+ throw new HoodieIndexException(String.format("No partitions to index
for instant: %s", instantTime));
+ }
+ // transition requested indexInstant to inflight
+
table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant,
Option.empty());
+ // start indexing for each partition
+ HoodieTableMetadataWriter metadataWriter =
table.getMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format("Could not
get metadata writer to run index action for instant: %s", instantTime)));
+ metadataWriter.index(context, indexPartitionInfos);
+
+ // get all instants since the plan completed (both from active timeline
and archived timeline)
+ // assumption is that all metadata partitions had same instant upto
which they were scheduled to be indexed
+ table.getMetaClient().reloadActiveTimeline();
+ String indexUptoInstant =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ List<HoodieInstant> instantsToIndex =
getRemainingArchivedAndActiveInstantsSince(indexUptoInstant,
table.getMetaClient());
+
+ // reconcile with metadata table timeline
+ String metadataBasePath =
HoodieTableMetadata.getMetadataTableBasePath(table.getMetaClient().getBasePath());
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
+ Set<String> metadataCompletedTimestamps =
getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant,
metadataMetaClient).stream()
+ .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+
+ // index all remaining instants with a timeout
+ currentIndexedInstant = indexUptoInstant;
+ ExecutorService executorService =
Executors.newFixedThreadPool(MAX_CONCURRENT_INDEXING);
+ Future<?> postRequestIndexingTaskFuture = executorService.submit(
+ new IndexingCheckTask(metadataWriter, instantsToIndex,
metadataCompletedTimestamps, table.getMetaClient()));
+ try {
+ postRequestIndexingTaskFuture.get(config.getIndexingCheckTimeout(),
TimeUnit.SECONDS);
+ } catch (TimeoutException | InterruptedException | ExecutionException e)
{
+ postRequestIndexingTaskFuture.cancel(true);
Review comment:
I was expecting us to throw/fail here. may I know why are we proceeding
further ?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.INDEX_ACTION;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+ private static final Logger LOG =
LogManager.getLogger(RunIndexActionExecutor.class);
+ private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION =
INDEX_COMMIT_METADATA_VERSION_1;
+ private static final int MAX_CONCURRENT_INDEXING = 1;
+
+ // we use this to update the latest instant in data timeline that has been
indexed in metadata table
+ // this needs to be volatile as it can be updated in the IndexingCheckTask
spawned by this executor
+ // assumption is that only one indexer can execute at a time
+ private volatile String currentIndexedInstant;
+
+ private final TransactionManager txnManager;
+
+ public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig
config, HoodieTable<T, I, K, O> table, String instantTime) {
+ super(context, config, table, instantTime);
+ this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
+ }
+
+ @Override
+ public Option<HoodieIndexCommitMetadata> execute() {
+ HoodieTimer indexTimer = new HoodieTimer();
+ indexTimer.startTimer();
+
+ // ensure lock provider configured
+ if
(!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() ||
StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
+ throw new HoodieIndexException(String.format("Need to set %s as %s and
configure lock provider class",
+ WRITE_CONCURRENCY_MODE.key(),
OPTIMISTIC_CONCURRENCY_CONTROL.name()));
+ }
+
+ HoodieInstant indexInstant = table.getActiveTimeline()
+ .filterPendingIndexTimeline()
+ .filter(instant -> instant.getTimestamp().equals(instantTime) &&
REQUESTED.equals(instant.getState()))
+ .lastInstant()
+ .orElseThrow(() -> new HoodieIndexException(String.format("No
requested index instant found: %s", instantTime)));
+ try {
+ // read HoodieIndexPlan
+ HoodieIndexPlan indexPlan =
TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+ List<HoodieIndexPartitionInfo> indexPartitionInfos =
indexPlan.getIndexPartitionInfos();
+ if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+ throw new HoodieIndexException(String.format("No partitions to index
for instant: %s", instantTime));
+ }
+ // transition requested indexInstant to inflight
+
table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant,
Option.empty());
+ // start indexing for each partition
+ HoodieTableMetadataWriter metadataWriter =
table.getMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format("Could not
get metadata writer to run index action for instant: %s", instantTime)));
+ metadataWriter.index(context, indexPartitionInfos);
+
+ // get all instants since the plan completed (both from active timeline
and archived timeline)
+ // assumption is that all metadata partitions had same instant upto
which they were scheduled to be indexed
+ table.getMetaClient().reloadActiveTimeline();
+ String indexUptoInstant =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ List<HoodieInstant> instantsToIndex =
getRemainingArchivedAndActiveInstantsSince(indexUptoInstant,
table.getMetaClient());
+
+ // reconcile with metadata table timeline
+ String metadataBasePath =
HoodieTableMetadata.getMetadataTableBasePath(table.getMetaClient().getBasePath());
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
+ Set<String> metadataCompletedTimestamps =
getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant,
metadataMetaClient).stream()
+ .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+
+ // index all remaining instants with a timeout
+ currentIndexedInstant = indexUptoInstant;
+ ExecutorService executorService =
Executors.newFixedThreadPool(MAX_CONCURRENT_INDEXING);
+ Future<?> postRequestIndexingTaskFuture = executorService.submit(
+ new IndexingCheckTask(metadataWriter, instantsToIndex,
metadataCompletedTimestamps, table.getMetaClient()));
+ try {
+ postRequestIndexingTaskFuture.get(config.getIndexingCheckTimeout(),
TimeUnit.SECONDS);
+ } catch (TimeoutException | InterruptedException | ExecutionException e)
{
+ postRequestIndexingTaskFuture.cancel(true);
+ } finally {
+ executorService.shutdownNow();
+ }
+ // save index commit metadata and return
+ List<HoodieIndexPartitionInfo> finalIndexPartitionInfos =
indexPartitionInfos.stream()
+ .map(info -> new HoodieIndexPartitionInfo(
+ info.getVersion(),
+ info.getMetadataPartitionPath(),
+ currentIndexedInstant))
+ .collect(Collectors.toList());
+ HoodieIndexCommitMetadata indexCommitMetadata =
HoodieIndexCommitMetadata.newBuilder()
+
.setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(finalIndexPartitionInfos).build();
+ try {
+ txnManager.beginTransaction();
+ table.getActiveTimeline().saveAsComplete(
+ new HoodieInstant(true, INDEX_ACTION, indexInstant.getTimestamp()),
+
TimelineMetadataUtils.serializeIndexCommitMetadata(indexCommitMetadata));
+ } finally {
+ txnManager.endTransaction();
+ }
+ return Option.of(indexCommitMetadata);
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to index instant:
%s", indexInstant));
+ }
+ }
+
+ private static List<HoodieInstant>
getRemainingArchivedAndActiveInstantsSince(String instant,
HoodieTableMetaClient metaClient) {
+ List<HoodieInstant> remainingInstantsToIndex =
metaClient.getArchivedTimeline()
+ .getWriteTimeline()
+ .findInstantsAfter(instant)
+ .getInstants().collect(Collectors.toList());
+
remainingInstantsToIndex.addAll(metaClient.getActiveTimeline().getWriteTimeline().findInstantsAfter(instant).getInstants().collect(Collectors.toList()));
+ return remainingInstantsToIndex;
+ }
+
+ private static List<HoodieInstant>
getCompletedArchivedAndActiveInstantsAfter(String instant,
HoodieTableMetaClient metaClient) {
+ List<HoodieInstant> completedInstants = metaClient.getArchivedTimeline()
+ .filterCompletedInstants()
+ .findInstantsAfter(instant)
+ .getInstants().collect(Collectors.toList());
+
completedInstants.addAll(metaClient.getActiveTimeline().filterCompletedInstants().findInstantsAfter(instant).getInstants().collect(Collectors.toList()));
+ return completedInstants;
+ }
+
+ /**
+ * Indexing check runs for instants that completed after the base instant
(in the index plan).
+ * It will check if these later instants have logged updates to metadata
table or not.
+ * If not, then it will do the update. If a later instant is inflight, it
will wait until it is completed or the task times out.
+ */
+ class IndexingCheckTask implements Runnable {
Review comment:
maybe we can name "IndexingCatchupTask"
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -663,20 +711,82 @@ private MetadataRecordsGenerationParams
getRecordsGenerationParams() {
/**
* Processes commit metadata from data table and commits to metadata table.
+ *
* @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the
respective metadata to List of HoodieRecords to be written to metadata table.
* @param <T> type of commit metadata.
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap
= convertMetadataFunction.convertMetadata();
- commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ if (!dataWriteConfig.isMetadataTableEnabled()) {
+ return;
+ }
+ Set<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionRecordsMap = convertMetadataFunction.convertMetadata();
+ commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ }
+ });
+ }
+
+ private Set<String> getMetadataPartitionsToUpdate() {
+ // fetch partitions to update from table config
+ Set<String> partitionsToUpdate =
Stream.of(dataMetaClient.getTableConfig().getCompletedMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet());
+
partitionsToUpdate.addAll(Stream.of(dataMetaClient.getTableConfig().getInflightMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet()));
+ if (!partitionsToUpdate.isEmpty()) {
+ return partitionsToUpdate;
}
+ // fallback to update files partition only if table config returned no
partitions
+ partitionsToUpdate.add(MetadataPartitionType.FILES.getPartitionPath());
+ return partitionsToUpdate;
+ }
+
+ @Override
+ public void index(HoodieEngineContext engineContext,
List<HoodieIndexPartitionInfo> indexPartitionInfos) {
+ if (indexPartitionInfos.isEmpty()) {
+ LOG.warn("No partition to index in the plan");
+ return;
+ }
+ String indexUptoInstantTime =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ indexPartitionInfos.forEach(indexPartitionInfo -> {
+ String relativePartitionPath =
indexPartitionInfo.getMetadataPartitionPath();
+ LOG.info(String.format("Creating a new metadata index for partition '%s'
under path %s upto instant %s",
+ relativePartitionPath, metadataWriteConfig.getBasePath(),
indexUptoInstantTime));
+ try {
+ // filegroup should have already been initialized while scheduling
index for this partition
+ if (!dataMetaClient.getFs().exists(new
Path(metadataWriteConfig.getBasePath(), relativePartitionPath))) {
+ throw new HoodieIndexException(String.format("File group not
initialized for metadata partition: %s, indexUptoInstant: %s. Looks like index
scheduling failed!",
+ relativePartitionPath, indexUptoInstantTime));
+ }
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to check whether
file group is initialized for metadata partition: %s, indexUptoInstant: %s",
+ relativePartitionPath, indexUptoInstantTime));
+ }
+
+ // return early and populate enabledPartitionTypes correctly (check in
initialCommit)
+ MetadataPartitionType partitionType =
MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT));
+ if (!enabledPartitionTypes.contains(partitionType)) {
+ throw new HoodieIndexException(String.format("Indexing for metadata
partition: %s is not enabled", partitionType));
+ }
+ });
+ // before initial commit update table config
+
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_INDEX_INFLIGHT.key(),
indexPartitionInfos.stream()
Review comment:
what incase diff processes are started to build index for different
partitions.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -645,12 +669,36 @@ private void initializeFileGroups(HoodieTableMetaClient
dataMetaClient, Metadata
}
}
+ public void dropIndex(List<MetadataPartitionType> indexesToDrop) throws
IOException {
+ Set<String> completedIndexes =
Stream.of(dataMetaClient.getTableConfig().getCompletedMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet());
+ Set<String> inflightIndexes =
Stream.of(dataMetaClient.getTableConfig().getInflightMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet());
+ for (MetadataPartitionType partitionType : indexesToDrop) {
+ String partitionPath = partitionType.getPartitionPath();
+ if (inflightIndexes.contains(partitionPath)) {
+ LOG.error("Metadata indexing in progress: " + partitionPath);
+ return;
+ }
+ LOG.warn("Deleting Metadata Table partitions: " + partitionPath);
+ dataMetaClient.getFs().delete(new
Path(metadataWriteConfig.getBasePath(), partitionPath), true);
+ completedIndexes.remove(partitionPath);
+ }
+ // update table config
+
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_INDEX_COMPLETED.key(),
String.join(",", completedIndexes));
Review comment:
something to think about. Other writes who are holding on to an in
memory table property are not going to get an updated value if we update here.
So, won't they end up failing. i.e. lets say when they read the table
properties, index was available, but when they are about to write to metadata
partitions, lets say it was deleted.
What we need is some kind of notification trigger. but these are completely
diff process altogether. So, I don't think there is any easy way. One simple
(ugly) way we could do is, after updating the table config, wait for 1 min and
then delete the MDT partitions here. So, we give 1 min for other writers to get
the updated table config.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -663,20 +711,82 @@ private MetadataRecordsGenerationParams
getRecordsGenerationParams() {
/**
* Processes commit metadata from data table and commits to metadata table.
+ *
* @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the
respective metadata to List of HoodieRecords to be written to metadata table.
* @param <T> type of commit metadata.
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap
= convertMetadataFunction.convertMetadata();
- commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ if (!dataWriteConfig.isMetadataTableEnabled()) {
+ return;
+ }
+ Set<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionRecordsMap = convertMetadataFunction.convertMetadata();
+ commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ }
+ });
+ }
+
+ private Set<String> getMetadataPartitionsToUpdate() {
+ // fetch partitions to update from table config
+ Set<String> partitionsToUpdate =
Stream.of(dataMetaClient.getTableConfig().getCompletedMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet());
+
partitionsToUpdate.addAll(Stream.of(dataMetaClient.getTableConfig().getInflightMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet()));
+ if (!partitionsToUpdate.isEmpty()) {
+ return partitionsToUpdate;
}
+ // fallback to update files partition only if table config returned no
partitions
+ partitionsToUpdate.add(MetadataPartitionType.FILES.getPartitionPath());
+ return partitionsToUpdate;
+ }
+
+ @Override
+ public void index(HoodieEngineContext engineContext,
List<HoodieIndexPartitionInfo> indexPartitionInfos) {
+ if (indexPartitionInfos.isEmpty()) {
+ LOG.warn("No partition to index in the plan");
+ return;
+ }
+ String indexUptoInstantTime =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ indexPartitionInfos.forEach(indexPartitionInfo -> {
+ String relativePartitionPath =
indexPartitionInfo.getMetadataPartitionPath();
+ LOG.info(String.format("Creating a new metadata index for partition '%s'
under path %s upto instant %s",
+ relativePartitionPath, metadataWriteConfig.getBasePath(),
indexUptoInstantTime));
+ try {
+ // filegroup should have already been initialized while scheduling
index for this partition
+ if (!dataMetaClient.getFs().exists(new
Path(metadataWriteConfig.getBasePath(), relativePartitionPath))) {
+ throw new HoodieIndexException(String.format("File group not
initialized for metadata partition: %s, indexUptoInstant: %s. Looks like index
scheduling failed!",
+ relativePartitionPath, indexUptoInstantTime));
+ }
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to check whether
file group is initialized for metadata partition: %s, indexUptoInstant: %s",
+ relativePartitionPath, indexUptoInstantTime));
+ }
+
+ // return early and populate enabledPartitionTypes correctly (check in
initialCommit)
+ MetadataPartitionType partitionType =
MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT));
+ if (!enabledPartitionTypes.contains(partitionType)) {
+ throw new HoodieIndexException(String.format("Indexing for metadata
partition: %s is not enabled", partitionType));
+ }
+ });
+ // before initial commit update table config
+
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_INDEX_INFLIGHT.key(),
indexPartitionInfos.stream()
+
.map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.joining(",")));
+ HoodieTableConfig.update(dataMetaClient.getFs(), new
Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
+ // check here for enabled partition types whether filegroups initialized
or not
+ initialCommit(indexUptoInstantTime);
+
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_INDEX_INFLIGHT.key(),
"");
+
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_INDEX_COMPLETED.key(),
indexPartitionInfos.stream()
Review comment:
also, generally think that, there could be different processes building
diff indexes. So, when updating table config, we should append only current
index partitions. and when clearing any configs, again we should clean up only
partitions pertaining to current process.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.INDEX_ACTION;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+ private static final Logger LOG =
LogManager.getLogger(RunIndexActionExecutor.class);
+ private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+ private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION =
INDEX_COMMIT_METADATA_VERSION_1;
+ private static final int MAX_CONCURRENT_INDEXING = 1;
+
+ // we use this to update the latest instant in data timeline that has been
indexed in metadata table
+ // this needs to be volatile as it can be updated in the IndexingCheckTask
spawned by this executor
+ // assumption is that only one indexer can execute at a time
+ private volatile String currentIndexedInstant;
+
+ private final TransactionManager txnManager;
+
+ public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig
config, HoodieTable<T, I, K, O> table, String instantTime) {
+ super(context, config, table, instantTime);
+ this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
+ }
+
+ @Override
+ public Option<HoodieIndexCommitMetadata> execute() {
+ HoodieTimer indexTimer = new HoodieTimer();
+ indexTimer.startTimer();
+
+ // ensure lock provider configured
+ if
(!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() ||
StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
+ throw new HoodieIndexException(String.format("Need to set %s as %s and
configure lock provider class",
+ WRITE_CONCURRENCY_MODE.key(),
OPTIMISTIC_CONCURRENCY_CONTROL.name()));
+ }
+
+ HoodieInstant indexInstant = table.getActiveTimeline()
+ .filterPendingIndexTimeline()
+ .filter(instant -> instant.getTimestamp().equals(instantTime) &&
REQUESTED.equals(instant.getState()))
+ .lastInstant()
+ .orElseThrow(() -> new HoodieIndexException(String.format("No
requested index instant found: %s", instantTime)));
+ try {
+ // read HoodieIndexPlan
+ HoodieIndexPlan indexPlan =
TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+ List<HoodieIndexPartitionInfo> indexPartitionInfos =
indexPlan.getIndexPartitionInfos();
+ if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+ throw new HoodieIndexException(String.format("No partitions to index
for instant: %s", instantTime));
+ }
+ // transition requested indexInstant to inflight
+
table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant,
Option.empty());
+ // start indexing for each partition
+ HoodieTableMetadataWriter metadataWriter =
table.getMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format("Could not
get metadata writer to run index action for instant: %s", instantTime)));
+ metadataWriter.index(context, indexPartitionInfos);
+
+ // get all instants since the plan completed (both from active timeline
and archived timeline)
+ // assumption is that all metadata partitions had same instant upto
which they were scheduled to be indexed
+ table.getMetaClient().reloadActiveTimeline();
+ String indexUptoInstant =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ List<HoodieInstant> instantsToIndex =
getRemainingArchivedAndActiveInstantsSince(indexUptoInstant,
table.getMetaClient());
+
+ // reconcile with metadata table timeline
+ String metadataBasePath =
HoodieTableMetadata.getMetadataTableBasePath(table.getMetaClient().getBasePath());
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
+ Set<String> metadataCompletedTimestamps =
getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant,
metadataMetaClient).stream()
+ .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+
+ // index all remaining instants with a timeout
+ currentIndexedInstant = indexUptoInstant;
+ ExecutorService executorService =
Executors.newFixedThreadPool(MAX_CONCURRENT_INDEXING);
+ Future<?> postRequestIndexingTaskFuture = executorService.submit(
+ new IndexingCheckTask(metadataWriter, instantsToIndex,
metadataCompletedTimestamps, table.getMetaClient()));
+ try {
+ postRequestIndexingTaskFuture.get(config.getIndexingCheckTimeout(),
TimeUnit.SECONDS);
+ } catch (TimeoutException | InterruptedException | ExecutionException e)
{
+ postRequestIndexingTaskFuture.cancel(true);
+ } finally {
+ executorService.shutdownNow();
+ }
+ // save index commit metadata and return
+ List<HoodieIndexPartitionInfo> finalIndexPartitionInfos =
indexPartitionInfos.stream()
+ .map(info -> new HoodieIndexPartitionInfo(
+ info.getVersion(),
+ info.getMetadataPartitionPath(),
+ currentIndexedInstant))
+ .collect(Collectors.toList());
+ HoodieIndexCommitMetadata indexCommitMetadata =
HoodieIndexCommitMetadata.newBuilder()
+
.setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(finalIndexPartitionInfos).build();
+ try {
+ txnManager.beginTransaction();
+ table.getActiveTimeline().saveAsComplete(
+ new HoodieInstant(true, INDEX_ACTION, indexInstant.getTimestamp()),
+
TimelineMetadataUtils.serializeIndexCommitMetadata(indexCommitMetadata));
+ } finally {
+ txnManager.endTransaction();
+ }
+ return Option.of(indexCommitMetadata);
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to index instant:
%s", indexInstant));
+ }
+ }
+
+ private static List<HoodieInstant>
getRemainingArchivedAndActiveInstantsSince(String instant,
HoodieTableMetaClient metaClient) {
+ List<HoodieInstant> remainingInstantsToIndex =
metaClient.getArchivedTimeline()
+ .getWriteTimeline()
+ .findInstantsAfter(instant)
+ .getInstants().collect(Collectors.toList());
+
remainingInstantsToIndex.addAll(metaClient.getActiveTimeline().getWriteTimeline().findInstantsAfter(instant).getInstants().collect(Collectors.toList()));
+ return remainingInstantsToIndex;
+ }
+
+ private static List<HoodieInstant>
getCompletedArchivedAndActiveInstantsAfter(String instant,
HoodieTableMetaClient metaClient) {
+ List<HoodieInstant> completedInstants = metaClient.getArchivedTimeline()
+ .filterCompletedInstants()
+ .findInstantsAfter(instant)
+ .getInstants().collect(Collectors.toList());
+
completedInstants.addAll(metaClient.getActiveTimeline().filterCompletedInstants().findInstantsAfter(instant).getInstants().collect(Collectors.toList()));
+ return completedInstants;
+ }
+
+ /**
+ * Indexing check runs for instants that completed after the base instant
(in the index plan).
+ * It will check if these later instants have logged updates to metadata
table or not.
+ * If not, then it will do the update. If a later instant is inflight, it
will wait until it is completed or the task times out.
+ */
+ class IndexingCheckTask implements Runnable {
+
+ private final HoodieTableMetadataWriter metadataWriter;
+ private final List<HoodieInstant> instantsToIndex;
+ private final Set<String> metadataCompletedInstants;
+ private final HoodieTableMetaClient metaClient;
+
+ IndexingCheckTask(HoodieTableMetadataWriter metadataWriter,
+ List<HoodieInstant> instantsToIndex,
+ Set<String> metadataCompletedInstants,
+ HoodieTableMetaClient metaClient) {
+ this.metadataWriter = metadataWriter;
+ this.instantsToIndex = instantsToIndex;
+ this.metadataCompletedInstants = metadataCompletedInstants;
+ this.metaClient = metaClient;
+ }
+
+ @Override
+ public void run() {
+ while (!Thread.interrupted()) {
+ for (HoodieInstant instant : instantsToIndex) {
+ // metadata index already updated for this instant
+ if (metadataCompletedInstants.contains(instant.getTimestamp())) {
+ currentIndexedInstant = instant.getTimestamp();
+ continue;
+ }
+ while (!instant.isCompleted()) {
+ // reload timeline and fetch instant details again wait until
timeout
+ String instantTime = instant.getTimestamp();
+ Option<HoodieInstant> currentInstant =
metaClient.reloadActiveTimeline()
+ .filterCompletedInstants().filter(i ->
i.getTimestamp().equals(instantTime)).firstInstant();
+ instant = currentInstant.orElse(instant);
+ }
+ // update metadata for this completed instant
+ if (COMPLETED.equals(instant.getState())) {
+ try {
+ // we need take a lock here as inflight writer could also try to
update the timeline
+ txnManager.beginTransaction(Option.of(instant), Option.empty());
+ switch (instant.getAction()) {
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
+
table.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
+ metadataWriter.update(commitMetadata,
instant.getTimestamp(), false);
+ break;
+ case HoodieTimeline.CLEAN_ACTION:
+ HoodieCleanMetadata cleanMetadata =
CleanerUtils.getCleanerMetadata(table.getMetaClient(), instant);
+ metadataWriter.update(cleanMetadata, instant.getTimestamp());
+ break;
+ case HoodieTimeline.RESTORE_ACTION:
+ HoodieRestoreMetadata restoreMetadata =
TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
+
table.getActiveTimeline().getInstantDetails(instant).get());
+ metadataWriter.update(restoreMetadata,
instant.getTimestamp());
+ break;
+ case HoodieTimeline.ROLLBACK_ACTION:
+ HoodieRollbackMetadata rollbackMetadata =
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
+
table.getActiveTimeline().getInstantDetails(instant).get());
+ metadataWriter.update(rollbackMetadata,
instant.getTimestamp());
+ break;
+ default:
+ throw new IllegalStateException("Unexpected value: " +
instant.getAction());
+ }
+ } catch (IOException e) {
+ LOG.error("Could not update metadata partition for instant: " +
instant);
Review comment:
may be we can throw here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]