vinothchandar commented on code in PR #13489: URL: https://github.com/apache/hudi/pull/13489#discussion_r2178811227
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieIndexMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.IndexVersionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import java.util.Collections; +import java.util.Map; + +public class EightToNineUpgradeHandler implements UpgradeHandler { + + @Override + public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, + String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { + HoodieTable table = upgradeDowngradeHelper.getTable(config, context); + HoodieTableMetaClient metaClient = table.getMetaClient(); + + // Populate missing index versions indexes + Option<HoodieIndexMetadata> indexMetadataOpt = metaClient.getIndexMetadata(); + if (indexMetadataOpt.isPresent()) { + IndexVersionUtils.populateIndexVersionIfMissing(metaClient.getTableConfig().getTableVersion(), indexMetadataOpt); + Review Comment: can this remain within this class.. the util method.. We don't anticipate using this anywhere else. right ########## hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java: ########## @@ -226,6 +227,11 @@ public String getPartitionPath(HoodieTableMetaClient metaClient, String indexNam checkArgument(metaClient.getIndexMetadata().isPresent(), "Index definition is not present for index: " + indexName); return metaClient.getIndexMetadata().get().getIndexDefinitions().get(indexName).getIndexName(); } + + @Override + public SerializableBiFunction<String, Integer, Integer> getFileGroupIndexFunction(HoodieIndexVersion indexVersion) { Review Comment: rename: getShardingFunction() or getFileGroupMappingFunction() ########## hudi-common/src/main/java/org/apache/hudi/common/util/IndexVersionUtils.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.hudi.common.model.HoodieIndexMetadata; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.metadata.HoodieIndexVersion; + +/** + * Utility class for managing index version operations. + */ +public class IndexVersionUtils { + + /** + * Populates missing version attributes in index definitions based on table version. + * + * @param tableVersion the table version to determine appropriate index versions + * @param indexDefOption optional index metadata containing index definitions + */ + public static void populateIndexVersionIfMissing(HoodieTableVersion tableVersion, Option<HoodieIndexMetadata> indexDefOption) { + indexDefOption.ifPresent(idxDefs -> Review Comment: lets move this closer to upgrade/downgrade unless used elsewhere ########## hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java: ########## @@ -129,4 +129,21 @@ public abstract <I, K, V> List<V> reduceByKey( public abstract <I, O> O aggregate(HoodieData<I> data, O zeroValue, Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp); public abstract <T> ReaderContextFactory<T> getReaderContextFactory(HoodieTableMetaClient metaClient); + + /** + * Groups values by key and applies a function to each group of values. + * [1 iterator maps to 1 key] It only guarantees that items returned by the same iterator shares to the same key. + * [exact once across iterators] The item returned by the same iterator will not be returned by other iterators. + * [1 key maps to >= 1 iterators] Items belong to the same shard can be load-balanced across multiple iterators. It's up to API implementations to decide + * load balancing pattern and how many iterators to split into. + * + * @param data The input pair<ShardIndex, Item> to process. + * @param func Function to apply to each group of items with the same shard + * @param maxShardIndex The range of ShardIndex in data parameter. If data contain ShardIndex 1,2,6, any maxShardIndex >=6 is valid. + * @param preservesPartitioning whether to preserve partitioning in the resulting collection. Review Comment: this still does not feel generic enough to shove into the engine context. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java: ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class NineToEightDowngradeHandler implements DowngradeHandler { + + @Override + public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { + HoodieTable table = upgradeDowngradeHelper.getTable(config, context); + UpgradeDowngradeUtils.dropNonV1SecondaryIndexPartitions( + config, context, table, upgradeDowngradeHelper, "downgrading from table version nine to eight"); Review Comment: just making sure.. the extra `version` field in the index metadata is ok? it wont fail after downgrade? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java: ########## @@ -214,4 +219,35 @@ static void rollbackFailedWritesAndCompact(HoodieTable table, HoodieEngineContex throw new HoodieException(e); } } + + /** + * Drops secondary index partitions from metadata table that are V2 or higher. + * + * @param config Write config + * @param context Engine context + * @param table Hoodie table + * @param operationType Type of operation (upgrade/downgrade) + */ + public static void dropNonV1SecondaryIndexPartitions(HoodieWriteConfig config, HoodieEngineContext context, + HoodieTable table, SupportsUpgradeDowngrade upgradeDowngradeHelper, String operationType) { Review Comment: lets ensure this has UT. ########## hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java: ########## @@ -270,13 +263,49 @@ public boolean buildIndexDefinition(HoodieIndexDefinition indexDefinition) { public void deleteIndexDefinition(String indexName) { checkState(indexMetadataOpt.isPresent(), "Index metadata is not present"); indexMetadataOpt.get().getIndexDefinitions().remove(indexName); + writeIndexMetadataToStorage(); + } + + /** + * Writes the current index metadata to storage. + */ + public void writeIndexMetadataToStorage() { + if (!indexMetadataOpt.isPresent()) { + return; + } + writeIndexMetadataToStorage(indexMetadataOpt.get()); + } + + /** + * Writes the provided index metadata to storage. + * + * @param indexMetadata the index metadata to write + */ + public void writeIndexMetadataToStorage(HoodieIndexMetadata indexMetadata) { String indexMetaPath = getIndexDefinitionPath(); try { // TODO[HUDI-9094]: should not write byte array directly FileIOUtils.createFileInPath(storage, new StoragePath(indexMetaPath), - Option.of(HoodieInstantWriter.convertByteArrayToWriter(getUTF8Bytes(indexMetadataOpt.get().toJson())))); - } catch (IOException e) { - throw new HoodieIOException("Could not write expression index metadata at path: " + indexMetaPath, e); + Option.of(HoodieInstantWriter.convertByteArrayToWriter(getUTF8Bytes(indexMetadata.toJson())))); + } catch (IOException e) { + throw new HoodieIOException("Could not write index metadata at path: " + indexMetaPath, e); + } + } + + /** + * Static method to write index metadata to storage. + * + * @param storage the storage to write to + * @param indexDefinitionPath the path where the index metadata should be written + * @param indexMetadata the index metadata to write + */ + public static void writeIndexMetadataToStorage(HoodieStorage storage, String indexDefinitionPath, HoodieIndexMetadata indexMetadata) { + try { Review Comment: UTs, ########## hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java: ########## @@ -129,4 +129,21 @@ public abstract <I, K, V> List<V> reduceByKey( public abstract <I, O> O aggregate(HoodieData<I> data, O zeroValue, Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp); public abstract <T> ReaderContextFactory<T> getReaderContextFactory(HoodieTableMetaClient metaClient); + + /** + * Groups values by key and applies a function to each group of values. + * [1 iterator maps to 1 key] It only guarantees that items returned by the same iterator shares to the same key. + * [exact once across iterators] The item returned by the same iterator will not be returned by other iterators. + * [1 key maps to >= 1 iterators] Items belong to the same shard can be load-balanced across multiple iterators. It's up to API implementations to decide + * load balancing pattern and how many iterators to split into. + * + * @param data The input pair<ShardIndex, Item> to process. + * @param func Function to apply to each group of items with the same shard + * @param maxShardIndex The range of ShardIndex in data parameter. If data contain ShardIndex 1,2,6, any maxShardIndex >=6 is valid. + * @param preservesPartitioning whether to preserve partitioning in the resulting collection. + * @param <R> Type of the result + * @return Result of applying the function to each group + */ + public abstract <R> HoodieData<R> processValuesOfTheSameShards( + HoodiePairData<Integer, String> data, SerializableFunction<Iterator<String>, Iterator<R>> func, Integer maxShardIndex, boolean preservesPartitioning); Review Comment: here, we need to use partitions and not shards.. since this class has no idea about MDT etc. >Groups values by key and applies a function to each group of values. lets name this. mapGroupsByKey() or sth. Please look at spark RDD for inspiration. ########## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java: ########## @@ -183,8 +225,23 @@ public Builder withIndexOptions(Map<String, String> indexOptions) { return this; } + public Builder withVersion(HoodieIndexVersion version) { + // Make sure the version enum matching the metadata partition is used. + this.version = version; + return this; + } + public HoodieIndexDefinition build() { - return new HoodieIndexDefinition(indexName, indexType, indexFunction, sourceFields, indexOptions); + ValidationUtils.checkArgument(indexName != null, "Could not build index definition with a null index name"); + ValidationUtils.checkArgument(indexType != null, "Could not build index definition with a null index type"); + return new HoodieIndexDefinition( + indexName, Review Comment: how do we ensure, code does not skip writing versions.. if its covered in another place, I am good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
