nsivabalan commented on code in PR #18142:
URL: https://github.com/apache/hudi/pull/18142#discussion_r2790351172
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java:
##########
@@ -28,12 +36,83 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
+
/**
* Utils class for performing various log file reading operations.
*/
public class LogReaderUtils {
+
+ /**
+ * Gets a map of log files which were created by commits with instant
timestamps that are less than or equal to the
+ * maxCommitInstantTime. All other log files will be filtered out. For each
log file maps to a list of commit instant
+ * times that are associated with each block that is found in the log file.
+ *
+ * @param metaClient Hoodie table meta client
+ * @param fsView Hoodie file system view
+ * @param partitionPaths list of partition paths to fetch log files from.
for MDT, this should be "files" or
+ * MetadataPartitionType.FILES.partitionPath()
+ * @param maxCommitInstantTime the max commit which created the log files
returned
+ * @param engineContext Engine context
+ * @return map of log file -> associated commit time for each block in the
log file
+ */
+ public static Map<HoodieLogFile, List<String>>
getAllLogFilesWithMaxCommit(HoodieTableMetaClient metaClient,
+
AbstractTableFileSystemView fsView,
+
List<String> partitionPaths,
+
String maxCommitInstantTime,
+
HoodieEngineContext engineContext) {
+ engineContext.setJobStatus("LogReaderUtils",
+ String.format("Getting list of log files in %s partition(s)",
partitionPaths.size()));
+
+ List<HoodieLogFile> logFiles = engineContext.flatMap(partitionPaths,
partitionPath -> fsView
+ .getLatestMergedFileSlicesBeforeOrOn(partitionPath,
maxCommitInstantTime)
+ .flatMap(FileSlice::getLogFiles),
+ Math.max(partitionPaths.size(), 1));
+
+ // get completion time of the max commit instant
+ String maxCommitCompletionTime =
metaClient.getActiveTimeline().filterCompletedInstants()
+ .findInstantsAfterOrEquals(maxCommitInstantTime, 1)
+ .filter(instant ->
instant.requestedTime().equals(maxCommitInstantTime))
+ .getInstants()
+ .stream()
+ .map(instant -> instant.getCompletionTime())
+ .findFirst().orElse(maxCommitInstantTime);
+
+ // filter out all commits completed after the max commit completion time
+ HoodieTimeline filteredTimeline = fsView.getTimeline().filter(instant ->
!fsView.getTimeline()
+ .findInstantsModifiedAfterByCompletionTime(maxCommitCompletionTime)
Review Comment:
can we introduce `findInstantsModifiedBeforeByCompletionTime` in the
`HoodieTimeline` only. rather than trying to re-use
`findInstantsModifiedAfterByCompletionTime` and negating the output
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/table/log/TestLogReaderUtils.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link LogReaderUtils}.
+ */
+public class TestLogReaderUtils extends SparkClientFunctionalTestHarness {
+
+ @Test
+ public void testGetAllLogFilesWithMaxCommit() throws Exception {
+ // Create a MERGE_ON_READ table to generate log files
+ HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, new Properties());
+
+ HoodieWriteConfig config = getConfigBuilder(true)
+ .withPath(basePath())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .build())
+ .build();
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+
+ try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
+ // First commit - insert data
+ String firstCommit = client.createNewInstantTime(true);
+ List<HoodieRecord> records1 = dataGen.generateInserts(firstCommit, 100);
+ JavaRDD<HoodieRecord> writeRecords1 = jsc().parallelize(records1, 1);
+ List<WriteStatus> statuses1 = client.insert(writeRecords1,
firstCommit).collect();
+ assertNoWriteErrors(statuses1);
+
+ // Second commit - update data to create log files
+ String secondCommit = client.createNewInstantTime(true);
+ List<HoodieRecord> records2 = dataGen.generateUpdates(secondCommit, 50);
+ JavaRDD<HoodieRecord> writeRecords2 = jsc().parallelize(records2, 1);
+ List<WriteStatus> statuses2 = client.upsert(writeRecords2,
secondCommit).collect();
+ assertNoWriteErrors(statuses2);
+
Review Comment:
can we validate from completed commit metadata, that log files were infact
produced. due to small file handling, chances that log files may not be
produced, but just parquet files
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/table/log/TestLogReaderUtils.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link LogReaderUtils}.
+ */
+public class TestLogReaderUtils extends SparkClientFunctionalTestHarness {
+
+ @Test
+ public void testGetAllLogFilesWithMaxCommit() throws Exception {
+ // Create a MERGE_ON_READ table to generate log files
+ HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, new Properties());
+
+ HoodieWriteConfig config = getConfigBuilder(true)
+ .withPath(basePath())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .build())
+ .build();
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+
+ try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
+ // First commit - insert data
+ String firstCommit = client.createNewInstantTime(true);
+ List<HoodieRecord> records1 = dataGen.generateInserts(firstCommit, 100);
+ JavaRDD<HoodieRecord> writeRecords1 = jsc().parallelize(records1, 1);
+ List<WriteStatus> statuses1 = client.insert(writeRecords1,
firstCommit).collect();
+ assertNoWriteErrors(statuses1);
+
+ // Second commit - update data to create log files
+ String secondCommit = client.createNewInstantTime(true);
+ List<HoodieRecord> records2 = dataGen.generateUpdates(secondCommit, 50);
+ JavaRDD<HoodieRecord> writeRecords2 = jsc().parallelize(records2, 1);
+ List<WriteStatus> statuses2 = client.upsert(writeRecords2,
secondCommit).collect();
+ assertNoWriteErrors(statuses2);
+
+ // Third commit - more updates
+ String thirdCommit = client.createNewInstantTime(true);
+ List<HoodieRecord> records3 = dataGen.generateUpdates(thirdCommit, 30);
+ JavaRDD<HoodieRecord> writeRecords3 = jsc().parallelize(records3, 1);
+ List<WriteStatus> statuses3 = client.upsert(writeRecords3,
thirdCommit).collect();
+ assertNoWriteErrors(statuses3);
+
+ // Reload metaClient to get latest timeline
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ // Get file system view
+ HoodieTableFileSystemView fsView =
HoodieTableFileSystemView.fileListingBasedFileSystemView(
+ context(),
+ metaClient,
+ metaClient.getActiveTimeline().filterCompletedInstants());
+
+ // Get all partitions
+ List<String> partitions = Arrays.asList(dataGen.getPartitionPaths());
+
+ // Test: Get all log files up to the second commit
+ Map<HoodieLogFile, List<String>> logFilesWithMaxCommit =
LogReaderUtils.getAllLogFilesWithMaxCommit(
+ metaClient,
+ fsView,
+ partitions,
+ secondCommit,
+ context());
+
+ // Verify results
+ assertNotNull(logFilesWithMaxCommit);
Review Comment:
can we also assert this is non empty
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java:
##########
@@ -28,12 +36,83 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
+
/**
* Utils class for performing various log file reading operations.
*/
public class LogReaderUtils {
+
+ /**
+ * Gets a map of log files which were created by commits with instant
timestamps that are less than or equal to the
+ * maxCommitInstantTime. All other log files will be filtered out. For each
log file maps to a list of commit instant
+ * times that are associated with each block that is found in the log file.
+ *
+ * @param metaClient Hoodie table meta client
+ * @param fsView Hoodie file system view
+ * @param partitionPaths list of partition paths to fetch log files from.
for MDT, this should be "files" or
+ * MetadataPartitionType.FILES.partitionPath()
+ * @param maxCommitInstantTime the max commit which created the log files
returned
+ * @param engineContext Engine context
+ * @return map of log file -> associated commit time for each block in the
log file
+ */
+ public static Map<HoodieLogFile, List<String>>
getAllLogFilesWithMaxCommit(HoodieTableMetaClient metaClient,
+
AbstractTableFileSystemView fsView,
+
List<String> partitionPaths,
+
String maxCommitInstantTime,
+
HoodieEngineContext engineContext) {
+ engineContext.setJobStatus("LogReaderUtils",
+ String.format("Getting list of log files in %s partition(s)",
partitionPaths.size()));
+
+ List<HoodieLogFile> logFiles = engineContext.flatMap(partitionPaths,
partitionPath -> fsView
+ .getLatestMergedFileSlicesBeforeOrOn(partitionPath,
maxCommitInstantTime)
+ .flatMap(FileSlice::getLogFiles),
+ Math.max(partitionPaths.size(), 1));
+
+ // get completion time of the max commit instant
+ String maxCommitCompletionTime =
metaClient.getActiveTimeline().filterCompletedInstants()
+ .findInstantsAfterOrEquals(maxCommitInstantTime, 1)
+ .filter(instant ->
instant.requestedTime().equals(maxCommitInstantTime))
+ .getInstants()
+ .stream()
+ .map(instant -> instant.getCompletionTime())
+ .findFirst().orElse(maxCommitInstantTime);
+
+ // filter out all commits completed after the max commit completion time
+ HoodieTimeline filteredTimeline = fsView.getTimeline().filter(instant ->
!fsView.getTimeline()
+ .findInstantsModifiedAfterByCompletionTime(maxCommitCompletionTime)
+ .containsInstant(instant));
+
+
+ engineContext.setJobStatus("LogReaderUtils",
+ String.format("Getting log file map for %s partition(s)",
partitionPaths.size()));
+ Map<HoodieLogFile, List<String>> logFilesWithMaxCommit =
engineContext.mapToPair(logFiles, logFile -> {
+ // read all blocks within the log file and find the commit associated
with each log block
+ List<String> blocksWithinLogFile = new ArrayList<>();
+ HoodieLogFormat.Reader reader =
HoodieLogFormat.newReader(metaClient.getStorage(), logFile, null);
+ while (reader.hasNext()) {
+ HoodieLogBlock block = reader.next();
+ String logBlockInstantTime =
block.getLogBlockHeader().get(INSTANT_TIME);
+ // check if the log file contains a block created by a commit that is
older than or equal to max commit
+ if (filteredTimeline.containsInstant(logBlockInstantTime)) {
Review Comment:
if one of the log block's instant time is in archived timeline, this might
not return it right?
shouldn't we do `containsOrBeforeTimelineStarts` kind of api
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/table/log/TestLogReaderUtils.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link LogReaderUtils}.
+ */
+public class TestLogReaderUtils extends SparkClientFunctionalTestHarness {
+
+ @Test
+ public void testGetAllLogFilesWithMaxCommit() throws Exception {
+ // Create a MERGE_ON_READ table to generate log files
+ HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, new Properties());
+
+ HoodieWriteConfig config = getConfigBuilder(true)
+ .withPath(basePath())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .build())
+ .build();
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+
+ try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
+ // First commit - insert data
+ String firstCommit = client.createNewInstantTime(true);
+ List<HoodieRecord> records1 = dataGen.generateInserts(firstCommit, 100);
+ JavaRDD<HoodieRecord> writeRecords1 = jsc().parallelize(records1, 1);
+ List<WriteStatus> statuses1 = client.insert(writeRecords1,
firstCommit).collect();
+ assertNoWriteErrors(statuses1);
+
+ // Second commit - update data to create log files
+ String secondCommit = client.createNewInstantTime(true);
+ List<HoodieRecord> records2 = dataGen.generateUpdates(secondCommit, 50);
+ JavaRDD<HoodieRecord> writeRecords2 = jsc().parallelize(records2, 1);
+ List<WriteStatus> statuses2 = client.upsert(writeRecords2,
secondCommit).collect();
+ assertNoWriteErrors(statuses2);
+
+ // Third commit - more updates
+ String thirdCommit = client.createNewInstantTime(true);
+ List<HoodieRecord> records3 = dataGen.generateUpdates(thirdCommit, 30);
+ JavaRDD<HoodieRecord> writeRecords3 = jsc().parallelize(records3, 1);
+ List<WriteStatus> statuses3 = client.upsert(writeRecords3,
thirdCommit).collect();
+ assertNoWriteErrors(statuses3);
+
+ // Reload metaClient to get latest timeline
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ // Get file system view
+ HoodieTableFileSystemView fsView =
HoodieTableFileSystemView.fileListingBasedFileSystemView(
+ context(),
+ metaClient,
+ metaClient.getActiveTimeline().filterCompletedInstants());
+
+ // Get all partitions
+ List<String> partitions = Arrays.asList(dataGen.getPartitionPaths());
+
+ // Test: Get all log files up to the second commit
+ Map<HoodieLogFile, List<String>> logFilesWithMaxCommit =
LogReaderUtils.getAllLogFilesWithMaxCommit(
+ metaClient,
+ fsView,
+ partitions,
+ secondCommit,
+ context());
+
+ // Verify results
+ assertNotNull(logFilesWithMaxCommit);
+
+ // For a MOR table with updates, we should have log files
+ // The method should filter out log files created after secondCommit
+ for (Map.Entry<HoodieLogFile, List<String>> entry :
logFilesWithMaxCommit.entrySet()) {
+ HoodieLogFile logFile = entry.getKey();
+ List<String> commitTimes = entry.getValue();
+
+ assertNotNull(logFile);
+ assertFalse(commitTimes.isEmpty(), "Each log file should have at least
one associated commit time");
+
+ // Verify all commit times are <= secondCommit
+ for (String commitTime : commitTimes) {
+ assertTrue(commitTime.compareTo(thirdCommit) < 0,
+ "Commit time should be before third commit: " + commitTime);
+ }
+ }
+
+ // Test with third commit - should include more log files
+ Map<HoodieLogFile, List<String>> allLogFiles =
LogReaderUtils.getAllLogFilesWithMaxCommit(
+ metaClient,
+ fsView,
+ partitions,
+ thirdCommit,
+ context());
+
+ assertNotNull(allLogFiles);
+ // Should have same or more entries when including more commits
+ assertTrue(allLogFiles.size() >= logFilesWithMaxCommit.size(),
+ "Should have same or more log files when including third commit");
+ }
Review Comment:
lets add a test where 1st log files commit time is archived.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]