the-other-tim-brown commented on code in PR #9367:
URL: https://github.com/apache/hudi/pull/9367#discussion_r1284709452
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -177,6 +177,7 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
* You can find more details in HUDI-3834.
*/
private static final Lazy<HoodieMetadataColumnStats.Builder>
METADATA_COLUMN_STATS_BUILDER_STUB =
Lazy.lazily(HoodieMetadataColumnStats::newBuilder);
+ private static final HoodieMetadataFileInfo DELETE_FILE_METADATA = new
HoodieMetadataFileInfo(0L, true);
Review Comment:
We kept creating instances of this in the delete path. Let me know if there
is any reason we can't reuse an object for that.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -507,8 +507,8 @@ public static List<HoodieRecord>
convertMetadataToFilesPartitionRecords(HoodieCl
final String partition = getPartitionIdentifier(partitionName);
// Files deleted from a partition
List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
- HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
- Option.of(new ArrayList<>(deletedFiles)));
Review Comment:
We were creating a copy of this list. I didn't see any reason to do this but
let me know if I'm missing something.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
+import org.apache.hudi.avro.model.IntWrapper;
+import org.apache.hudi.avro.model.StringWrapper;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieDeltaWriteStat;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.action.clean.CleanPlanner;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.reflect.ClassTag;
+
+import static org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY;
+
+/**
+ * Asserts that tables initialized from file paths created outside Hudi can
properly be loaded.
+ */
+public class TestExternalPathHandling extends HoodieClientTestBase {
+
+ private static final String FIELD_1 = "field1";
+ private static final String FIELD_2 = "field2";
+ private HoodieWriteConfig writeConfig;
+
+ @ParameterizedTest
+ @MethodSource("getArgs")
+ public void testFlow(FileIdAndNameGenerator fileIdAndNameGenerator,
List<String> partitions) throws Exception {
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ writeConfig = HoodieWriteConfig.newBuilder()
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(INMEMORY).build())
+ .withPath(metaClient.getBasePathV2().toString())
+ .withEmbeddedTimelineServerEnabled(false)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(2)
+ .withFileSystemBootstrapDisabled(true)
+ .enable(true)
+ .withMetadataIndexColumnStats(true)
+ .withColumnStatsIndexForColumns(FIELD_1 + "," + FIELD_2)
+ .build())
+
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(1,
2).build())
+ .withTableServicesEnabled(true)
+ .build();
+
+ writeClient = getHoodieWriteClient(writeConfig);
+ String instantTime1 =
writeClient.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION, metaClient);
+ String partitionPath1 = partitions.get(0);
+ Pair<String, String> fileIdAndName1 = fileIdAndNameGenerator.generate(1,
instantTime1);
+ String fileId1 = fileIdAndName1.getLeft();
+ String fileName1 = fileIdAndName1.getRight();
+ String filePath1 = getPath(partitionPath1, fileName1);
+ WriteStatus writeStatus1 = createWriteStatus(partitionPath1, filePath1,
fileId1);
+ JavaRDD<WriteStatus> rdd1 =
createRdd(Collections.singletonList(writeStatus1));
+ metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(new
HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime1), Option.empty());
+ writeClient.commit(instantTime1, rdd1, Option.empty(),
HoodieTimeline.REPLACE_COMMIT_ACTION, Collections.emptyMap());
+
+ assertFileGroupCorrectness(instantTime1, partitionPath1, filePath1,
fileId1, 1);
+
+ // add a new file and remove the old one
+ String instantTime2 =
writeClient.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION, metaClient);
+ Pair<String, String> fileIdAndName2 = fileIdAndNameGenerator.generate(2,
instantTime2);
+ String fileId2 = fileIdAndName2.getLeft();
+ String fileName2 = fileIdAndName2.getRight();
+ String filePath2 = getPath(partitionPath1, fileName2);
+ WriteStatus newWriteStatus = createWriteStatus(partitionPath1, filePath2,
fileId2);
+ JavaRDD<WriteStatus> rdd2 =
createRdd(Collections.singletonList(newWriteStatus));
+ Map<String, List<String>> partitionToReplacedFileIds = new HashMap<>();
+ partitionToReplacedFileIds.put(partitionPath1,
Collections.singletonList(fileId1));
+ metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(new
HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime2), Option.empty());
+ writeClient.commit(instantTime2, rdd2, Option.empty(),
HoodieTimeline.REPLACE_COMMIT_ACTION, partitionToReplacedFileIds);
+
+ assertFileGroupCorrectness(instantTime2, partitionPath1, filePath2,
fileId2, 1);
+
+ // Add file to a new partition
+ String partitionPath2 = partitions.get(1);
+ String instantTime3 =
writeClient.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION, metaClient);
+ Pair<String, String> fileIdAndName3 = fileIdAndNameGenerator.generate(3,
instantTime3);
+ String fileId3 = fileIdAndName3.getLeft();
+ String fileName3 = fileIdAndName3.getRight();
+ String filePath3 = getPath(partitionPath2, fileName3);
+ WriteStatus writeStatus3 = createWriteStatus(partitionPath2, filePath3,
fileId3);
+ JavaRDD<WriteStatus> rdd3 =
createRdd(Collections.singletonList(writeStatus3));
+ metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(new
HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime3), Option.empty());
+ writeClient.commit(instantTime3, rdd3, Option.empty(),
HoodieTimeline.REPLACE_COMMIT_ACTION, Collections.emptyMap());
+
+ assertFileGroupCorrectness(instantTime3, partitionPath2, filePath3,
fileId3, partitionPath2.isEmpty() ? 2 : 1);
+
+ // clean first commit
+ String cleanTime = HoodieActiveTimeline.createNewInstantTime();
+ HoodieCleanerPlan cleanerPlan = cleanerPlan(new
HoodieActionInstant(instantTime2, HoodieTimeline.REPLACE_COMMIT_ACTION,
HoodieInstant.State.COMPLETED.name()), instantTime3,
+ Collections.singletonMap(partitionPath1, Collections.singletonList(new
HoodieCleanFileInfo(filePath1, false))));
+ metaClient.getActiveTimeline().saveToCleanRequested(new
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION,
cleanTime),
+ TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
+ HoodieInstant inflightClean =
metaClient.getActiveTimeline().transitionCleanRequestedToInflight(
+ new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.CLEAN_ACTION, cleanTime), Option.empty());
+ List<HoodieCleanStat> cleanStats =
Collections.singletonList(createCleanStat(partitionPath1,
Arrays.asList(filePath1), instantTime2, instantTime3));
+ HoodieCleanMetadata cleanMetadata = CleanerUtils.convertCleanMetadata(
+ cleanTime,
+ Option.empty(),
+ cleanStats);
+ HoodieTableMetadataWriter hoodieTableMetadataWriter =
(HoodieTableMetadataWriter) writeClient.initTable(WriteOperationType.UPSERT,
Option.of(cleanTime)).getMetadataWriter(cleanTime).get();
+ hoodieTableMetadataWriter.update(cleanMetadata, cleanTime);
+
metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightClean,
+ TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
+ // make sure we still get the same results as before
+ assertFileGroupCorrectness(instantTime2, partitionPath1, filePath2,
fileId2, partitionPath2.isEmpty() ? 2 : 1);
+ assertFileGroupCorrectness(instantTime3, partitionPath2, filePath3,
fileId3, partitionPath2.isEmpty() ? 2 : 1);
+
+ // trigger archiver manually
+ writeClient.archive();
+ // assert commit was archived
+ Assertions.assertEquals(1,
metaClient.getArchivedTimeline().reload().filterCompletedInstants().countInstants());
+ // make sure we still get the same results as before
+ assertFileGroupCorrectness(instantTime2, partitionPath1, filePath2,
fileId2, partitionPath2.isEmpty() ? 2 : 1);
+ assertFileGroupCorrectness(instantTime3, partitionPath2, filePath3,
fileId3, partitionPath2.isEmpty() ? 2 : 1);
+
+ // assert that column stats are correct
+ HoodieBackedTableMetadata hoodieBackedTableMetadata = new
HoodieBackedTableMetadata(context, writeConfig.getMetadataConfig(),
writeConfig.getBasePath(), true);
+ assertEmptyColStats(hoodieBackedTableMetadata, partitionPath1, fileName1);
+ assertColStats(hoodieBackedTableMetadata, partitionPath1, fileName2);
+ assertColStats(hoodieBackedTableMetadata, partitionPath2, fileName3);
+ }
+
+ static Stream<Arguments> getArgs() {
Review Comment:
I had cases for the traditional hudi paths locally for initial test setup
validation but removed them as they don't seem relevant after that initial
validation that the test itself was set up correctly. Let me know if you would
like me to add them back in.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -324,33 +325,46 @@ public static HoodieRecord<HoodieMetadataPayload>
createPartitionListRecord(List
* @param partition The name of the partition
* @param filesAdded Mapping of files to their sizes for files which have
been added to this partition
* @param filesDeleted List of files which have been deleted from this
partition
+ * @param instantTime Commit time of the commit responsible for adding
and/or deleting these files, will be empty during bootstrapping of the metadata
table
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionFilesRecord(String partition,
-
Option<Map<String, Long>> filesAdded,
-
Option<List<String>> filesDeleted) {
- Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
- filesAdded.ifPresent(filesMap ->
- fileInfo.putAll(
- filesMap.entrySet().stream().collect(
- Collectors.toMap(Map.Entry::getKey, (entry) -> {
- long fileSize = entry.getValue();
- // Assert that the file-size of the file being added is
positive, since Hudi
- // should not be creating empty files
- checkState(fileSize > 0);
- return new HoodieMetadataFileInfo(fileSize, false);
- })))
- );
- filesDeleted.ifPresent(filesList ->
- fileInfo.putAll(
- filesList.stream().collect(
- Collectors.toMap(Function.identity(), (ignored) -> new
HoodieMetadataFileInfo(0L, true))))
- );
+
Map<String, Long> filesAdded,
+
List<String> filesDeleted,
Review Comment:
I updated the signature here to avoid wrapping collections in options.
Instead we can use empty collections when we were using an empty option. This
makes the code a bit cleaner as well as avoids the extra object overhead
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]