the-other-tim-brown commented on code in PR #9367:
URL: https://github.com/apache/hudi/pull/9367#discussion_r1284709452


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -177,6 +177,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
    * You can find more details in HUDI-3834.
    */
   private static final Lazy<HoodieMetadataColumnStats.Builder> 
METADATA_COLUMN_STATS_BUILDER_STUB = 
Lazy.lazily(HoodieMetadataColumnStats::newBuilder);
+  private static final HoodieMetadataFileInfo DELETE_FILE_METADATA = new 
HoodieMetadataFileInfo(0L, true);

Review Comment:
   We kept creating instances of this in the delete path. Let me know if there 
is any reason we can't reuse an object for that.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -507,8 +507,8 @@ public static List<HoodieRecord> 
convertMetadataToFilesPartitionRecords(HoodieCl
       final String partition = getPartitionIdentifier(partitionName);
       // Files deleted from a partition
       List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
-      HoodieRecord record = 
HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
-          Option.of(new ArrayList<>(deletedFiles)));

Review Comment:
   We were creating a copy of this list. I didn't see any reason to do this but 
let me know if I'm missing something.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
+import org.apache.hudi.avro.model.IntWrapper;
+import org.apache.hudi.avro.model.StringWrapper;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieDeltaWriteStat;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.action.clean.CleanPlanner;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.reflect.ClassTag;
+
+import static org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY;
+
+/**
+ * Asserts that tables initialized from file paths created outside Hudi can 
properly be loaded.
+ */
+public class TestExternalPathHandling extends HoodieClientTestBase {
+
+  private static final String FIELD_1 = "field1";
+  private static final String FIELD_2 = "field2";
+  private HoodieWriteConfig writeConfig;
+
+  @ParameterizedTest
+  @MethodSource("getArgs")
+  public void testFlow(FileIdAndNameGenerator fileIdAndNameGenerator, 
List<String> partitions) throws Exception {
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    writeConfig = HoodieWriteConfig.newBuilder()
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(INMEMORY).build())
+        .withPath(metaClient.getBasePathV2().toString())
+        .withEmbeddedTimelineServerEnabled(false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(2)
+            .withFileSystemBootstrapDisabled(true)
+            .enable(true)
+            .withMetadataIndexColumnStats(true)
+            .withColumnStatsIndexForColumns(FIELD_1 + "," + FIELD_2)
+            .build())
+        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(1, 
2).build())
+        .withTableServicesEnabled(true)
+        .build();
+
+    writeClient = getHoodieWriteClient(writeConfig);
+    String instantTime1 = 
writeClient.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION, metaClient);
+    String partitionPath1 = partitions.get(0);
+    Pair<String, String> fileIdAndName1 = fileIdAndNameGenerator.generate(1, 
instantTime1);
+    String fileId1 = fileIdAndName1.getLeft();
+    String fileName1 = fileIdAndName1.getRight();
+    String filePath1 = getPath(partitionPath1, fileName1);
+    WriteStatus writeStatus1 = createWriteStatus(partitionPath1, filePath1, 
fileId1);
+    JavaRDD<WriteStatus> rdd1 = 
createRdd(Collections.singletonList(writeStatus1));
+    metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(new 
HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime1), Option.empty());
+    writeClient.commit(instantTime1, rdd1, Option.empty(), 
HoodieTimeline.REPLACE_COMMIT_ACTION,  Collections.emptyMap());
+
+    assertFileGroupCorrectness(instantTime1, partitionPath1, filePath1, 
fileId1, 1);
+
+    // add a new file and remove the old one
+    String instantTime2 = 
writeClient.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION, metaClient);
+    Pair<String, String> fileIdAndName2 = fileIdAndNameGenerator.generate(2, 
instantTime2);
+    String fileId2 = fileIdAndName2.getLeft();
+    String fileName2 = fileIdAndName2.getRight();
+    String filePath2 = getPath(partitionPath1, fileName2);
+    WriteStatus newWriteStatus = createWriteStatus(partitionPath1, filePath2, 
fileId2);
+    JavaRDD<WriteStatus> rdd2 = 
createRdd(Collections.singletonList(newWriteStatus));
+    Map<String, List<String>> partitionToReplacedFileIds = new HashMap<>();
+    partitionToReplacedFileIds.put(partitionPath1, 
Collections.singletonList(fileId1));
+    metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(new 
HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime2), Option.empty());
+    writeClient.commit(instantTime2, rdd2, Option.empty(), 
HoodieTimeline.REPLACE_COMMIT_ACTION, partitionToReplacedFileIds);
+
+    assertFileGroupCorrectness(instantTime2, partitionPath1, filePath2, 
fileId2, 1);
+
+    // Add file to a new partition
+    String partitionPath2 = partitions.get(1);
+    String instantTime3 = 
writeClient.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION, metaClient);
+    Pair<String, String> fileIdAndName3 = fileIdAndNameGenerator.generate(3, 
instantTime3);
+    String fileId3 = fileIdAndName3.getLeft();
+    String fileName3 = fileIdAndName3.getRight();
+    String filePath3 = getPath(partitionPath2, fileName3);
+    WriteStatus writeStatus3 = createWriteStatus(partitionPath2, filePath3, 
fileId3);
+    JavaRDD<WriteStatus> rdd3 = 
createRdd(Collections.singletonList(writeStatus3));
+    metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(new 
HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime3), Option.empty());
+    writeClient.commit(instantTime3, rdd3, Option.empty(), 
HoodieTimeline.REPLACE_COMMIT_ACTION, Collections.emptyMap());
+
+    assertFileGroupCorrectness(instantTime3, partitionPath2, filePath3, 
fileId3, partitionPath2.isEmpty() ? 2 : 1);
+
+    // clean first commit
+    String cleanTime = HoodieActiveTimeline.createNewInstantTime();
+    HoodieCleanerPlan cleanerPlan = cleanerPlan(new 
HoodieActionInstant(instantTime2, HoodieTimeline.REPLACE_COMMIT_ACTION, 
HoodieInstant.State.COMPLETED.name()), instantTime3,
+        Collections.singletonMap(partitionPath1, Collections.singletonList(new 
HoodieCleanFileInfo(filePath1, false))));
+    metaClient.getActiveTimeline().saveToCleanRequested(new 
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, 
cleanTime),
+        TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
+    HoodieInstant inflightClean = 
metaClient.getActiveTimeline().transitionCleanRequestedToInflight(
+        new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.CLEAN_ACTION, cleanTime), Option.empty());
+    List<HoodieCleanStat> cleanStats = 
Collections.singletonList(createCleanStat(partitionPath1, 
Arrays.asList(filePath1), instantTime2, instantTime3));
+    HoodieCleanMetadata cleanMetadata = CleanerUtils.convertCleanMetadata(
+        cleanTime,
+        Option.empty(),
+        cleanStats);
+    HoodieTableMetadataWriter hoodieTableMetadataWriter = 
(HoodieTableMetadataWriter) writeClient.initTable(WriteOperationType.UPSERT, 
Option.of(cleanTime)).getMetadataWriter(cleanTime).get();
+    hoodieTableMetadataWriter.update(cleanMetadata, cleanTime);
+    
metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightClean,
+        TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
+    // make sure we still get the same results as before
+    assertFileGroupCorrectness(instantTime2, partitionPath1, filePath2, 
fileId2, partitionPath2.isEmpty() ? 2 : 1);
+    assertFileGroupCorrectness(instantTime3, partitionPath2, filePath3, 
fileId3, partitionPath2.isEmpty() ? 2 : 1);
+
+    // trigger archiver manually
+    writeClient.archive();
+    // assert commit was archived
+    Assertions.assertEquals(1, 
metaClient.getArchivedTimeline().reload().filterCompletedInstants().countInstants());
+    // make sure we still get the same results as before
+    assertFileGroupCorrectness(instantTime2, partitionPath1, filePath2, 
fileId2, partitionPath2.isEmpty() ? 2 : 1);
+    assertFileGroupCorrectness(instantTime3, partitionPath2, filePath3, 
fileId3, partitionPath2.isEmpty() ? 2 : 1);
+
+    // assert that column stats are correct
+    HoodieBackedTableMetadata hoodieBackedTableMetadata = new 
HoodieBackedTableMetadata(context, writeConfig.getMetadataConfig(), 
writeConfig.getBasePath(), true);
+    assertEmptyColStats(hoodieBackedTableMetadata, partitionPath1, fileName1);
+    assertColStats(hoodieBackedTableMetadata, partitionPath1, fileName2);
+    assertColStats(hoodieBackedTableMetadata, partitionPath2, fileName3);
+  }
+
+  static Stream<Arguments> getArgs() {

Review Comment:
   I had cases for the traditional hudi paths locally for initial test setup 
validation but removed them as they don't seem relevant after that initial 
validation that the test itself was set up correctly. Let me know if you would 
like me to add them back in.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -324,33 +325,46 @@ public static HoodieRecord<HoodieMetadataPayload> 
createPartitionListRecord(List
    * @param partition    The name of the partition
    * @param filesAdded   Mapping of files to their sizes for files which have 
been added to this partition
    * @param filesDeleted List of files which have been deleted from this 
partition
+   * @param instantTime  Commit time of the commit responsible for adding 
and/or deleting these files, will be empty during bootstrapping of the metadata 
table
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionFilesRecord(String partition,
-                                                                               
Option<Map<String, Long>> filesAdded,
-                                                                               
Option<List<String>> filesDeleted) {
-    Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
-    filesAdded.ifPresent(filesMap ->
-        fileInfo.putAll(
-            filesMap.entrySet().stream().collect(
-                Collectors.toMap(Map.Entry::getKey, (entry) -> {
-                  long fileSize = entry.getValue();
-                  // Assert that the file-size of the file being added is 
positive, since Hudi
-                  // should not be creating empty files
-                  checkState(fileSize > 0);
-                  return new HoodieMetadataFileInfo(fileSize, false);
-                })))
-    );
-    filesDeleted.ifPresent(filesList ->
-        fileInfo.putAll(
-            filesList.stream().collect(
-                Collectors.toMap(Function.identity(), (ignored) -> new 
HoodieMetadataFileInfo(0L, true))))
-    );
+                                                                               
Map<String, Long> filesAdded,
+                                                                               
List<String> filesDeleted,

Review Comment:
   I updated the signature here to avoid wrapping collections in options. 
Instead we can use empty collections when we were using an empty option. This 
makes the code a bit cleaner as well as avoids the extra object overhead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to