This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 462fd02 [HUDI-571] Add 'commits show archived' command to CLI
462fd02 is described below
commit 462fd025563b0ae8a4d4f28d366a9bbfca070d3f
Author: Satish Kotha <[email protected]>
AuthorDate: Wed Jan 22 13:50:34 2020 -0800
[HUDI-571] Add 'commits show archived' command to CLI
---
.../apache/hudi/cli/commands/CommitsCommand.java | 105 +++++++++--
.../apache/hudi/io/TestHoodieCommitArchiveLog.java | 73 ++++----
.../apache/hudi/common/model/HoodieWriteStat.java | 3 +-
.../apache/hudi/common/table/HoodieTimeline.java | 8 +
.../table/timeline/HoodieActiveTimeline.java | 89 ----------
.../table/timeline/HoodieArchivedTimeline.java | 192 ++++++++++++++++++---
.../table/timeline/HoodieDefaultTimeline.java | 81 ++++++++-
.../common/table/TestHoodieTableMetaClient.java | 35 ----
8 files changed, 385 insertions(+), 201 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
index c0f8ead..3a11e58 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
@@ -28,9 +28,12 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.NumericUtils;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.spark.launcher.SparkLauncher;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
@@ -38,7 +41,10 @@ import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
import java.io.IOException;
+import java.time.ZonedDateTime;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -51,6 +57,49 @@ import java.util.stream.Collectors;
@Component
public class CommitsCommand implements CommandMarker {
+ private String printCommits(HoodieDefaultTimeline timeline,
+ final Integer limit, final String sortByField,
+ final boolean descending,
+ final boolean headerOnly) throws IOException {
+ final List<Comparable[]> rows = new ArrayList<>();
+
+ final List<HoodieInstant> commits =
timeline.getCommitsTimeline().filterCompletedInstants()
+ .getInstants().collect(Collectors.toList());
+ // timeline can be read from multiple files. So sort is needed instead of
reversing the collection
+ Collections.sort(commits, HoodieInstant.COMPARATOR.reversed());
+
+ for (int i = 0; i < commits.size(); i++) {
+ final HoodieInstant commit = commits.get(i);
+ final HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
+ timeline.getInstantDetails(commit).get(),
+ HoodieCommitMetadata.class);
+ rows.add(new Comparable[]{commit.getTimestamp(),
+ commitMetadata.fetchTotalBytesWritten(),
+ commitMetadata.fetchTotalFilesInsert(),
+ commitMetadata.fetchTotalFilesUpdated(),
+ commitMetadata.fetchTotalPartitionsWritten(),
+ commitMetadata.fetchTotalRecordsWritten(),
+ commitMetadata.fetchTotalUpdateRecordsWritten(),
+ commitMetadata.fetchTotalWriteErrors()});
+ }
+
+ final Map<String, Function<Object, String>> fieldNameToConverterMap = new
HashMap<>();
+ fieldNameToConverterMap.put("Total Bytes Written", entry -> {
+ return
NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
+ });
+
+ final TableHeader header = new TableHeader()
+ .addTableHeaderField("CommitTime")
+ .addTableHeaderField("Total Bytes Written")
+ .addTableHeaderField("Total Files Added")
+ .addTableHeaderField("Total Files Updated")
+ .addTableHeaderField("Total Partitions Written")
+ .addTableHeaderField("Total Records Written")
+ .addTableHeaderField("Total Update Records Written")
+ .addTableHeaderField("Total Errors");
+ return HoodiePrintHelper.print(header, fieldNameToConverterMap,
sortByField, descending, limit, headerOnly, rows);
+ }
+
@CliCommand(value = "commits show", help = "Show the commits")
public String showCommits(
@CliOption(key = {"limit"}, help = "Limit commits",
@@ -62,26 +111,39 @@ public class CommitsCommand implements CommandMarker {
throws IOException {
HoodieActiveTimeline activeTimeline =
HoodieCLI.getTableMetaClient().getActiveTimeline();
- HoodieTimeline timeline =
activeTimeline.getCommitsTimeline().filterCompletedInstants();
- List<HoodieInstant> commits =
timeline.getReverseOrderedInstants().collect(Collectors.toList());
- List<Comparable[]> rows = new ArrayList<>();
- for (HoodieInstant commit : commits) {
- HoodieCommitMetadata commitMetadata =
-
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
HoodieCommitMetadata.class);
- rows.add(new Comparable[] {commit.getTimestamp(),
commitMetadata.fetchTotalBytesWritten(),
- commitMetadata.fetchTotalFilesInsert(),
commitMetadata.fetchTotalFilesUpdated(),
- commitMetadata.fetchTotalPartitionsWritten(),
commitMetadata.fetchTotalRecordsWritten(),
- commitMetadata.fetchTotalUpdateRecordsWritten(),
commitMetadata.fetchTotalWriteErrors()});
- }
-
- Map<String, Function<Object, String>> fieldNameToConverterMap = new
HashMap<>();
- fieldNameToConverterMap.put("Total Bytes Written", entry ->
NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString()))));
+ return printCommits(activeTimeline, limit, sortByField, descending,
headerOnly);
+ }
- TableHeader header = new
TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("Total
Bytes Written")
- .addTableHeaderField("Total Files Added").addTableHeaderField("Total
Files Updated")
- .addTableHeaderField("Total Partitions
Written").addTableHeaderField("Total Records Written")
- .addTableHeaderField("Total Update Records
Written").addTableHeaderField("Total Errors");
- return HoodiePrintHelper.print(header, fieldNameToConverterMap,
sortByField, descending, limit, headerOnly, rows);
+ @CliCommand(value = "commits show archived", help = "Show the archived
commits")
+ public String showArchivedCommits(
+ @CliOption(key = {"startTs"}, mandatory = false, help = "start time
for commits, default: now - 10 days")
+ String startTs,
+ @CliOption(key = {"endTs"}, mandatory = false, help = "end time for
commits, default: now - 1 day")
+ String endTs,
+ @CliOption(key = {"limit"}, mandatory = false, help = "Limit
commits", unspecifiedDefaultValue = "-1")
+ final Integer limit,
+ @CliOption(key = {"sortBy"}, help = "Sorting Field",
unspecifiedDefaultValue = "")
+ final String sortByField,
+ @CliOption(key = {"desc"}, help = "Ordering",
unspecifiedDefaultValue = "false")
+ final boolean descending,
+ @CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false")
+ final boolean headerOnly)
+ throws IOException {
+ if (StringUtils.isNullOrEmpty(startTs)) {
+ startTs = getTimeDaysAgo(10);
+ }
+ if (StringUtils.isNullOrEmpty(endTs)) {
+ endTs = getTimeDaysAgo(1);
+ }
+ HoodieArchivedTimeline archivedTimeline =
HoodieCLI.getTableMetaClient().getArchivedTimeline();
+ try {
+ archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
+ return printCommits(archivedTimeline.findInstantsInRange(startTs, endTs),
+ limit, sortByField, descending, headerOnly);
+ } finally {
+ // clear the instant details from memory after printing to reduce usage
+ archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs);
+ }
}
@CliCommand(value = "commits refresh", help = "Refresh the commits")
@@ -241,4 +303,9 @@ public class CommitsCommand implements CommandMarker {
+ HoodieCLI.syncTableMetadata.getTableConfig().getTableName();
}
+ private String getTimeDaysAgo(int numberOfDays) {
+ Date date =
Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
+ return HoodieActiveTimeline.COMMIT_FORMATTER.format(date);
+ }
+
}
diff --git
a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
index c0fb1ad..eccbc7a 100644
---
a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
+++
b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
@@ -19,24 +19,18 @@
package org.apache.hudi.io;
import org.apache.hudi.HoodieClientTestHarness;
-import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.common.HoodieTestDataGenerator;
-import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import com.google.common.collect.Sets;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.After;
@@ -44,7 +38,8 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -197,35 +192,18 @@ public class TestHoodieCommitArchiveLog extends
HoodieClientTestHarness {
instants.contains(new HoodieInstant(State.INFLIGHT,
HoodieTimeline.COMPACTION_ACTION, "105")));
// read the file
- Reader reader =
- HoodieLogFormat.newReader(dfs, new HoodieLogFile(new Path(basePath +
"/.hoodie/.commits_.archive.1_1-0-1")),
- HoodieArchivedMetaEntry.getClassSchema());
- int archivedRecordsCount = 0;
- List<IndexedRecord> readRecords = new ArrayList<>();
- // read the avro blocks and validate the number of records written in each
avro block
- int numBlocks = 0;
- while (reader.hasNext()) {
- HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- List<IndexedRecord> records = blk.getRecords();
- readRecords.addAll(records);
- archivedRecordsCount += records.size();
- numBlocks++;
- }
- System.out.println("Read Records :" + readRecords.stream().map(r ->
(GenericRecord) r)
- .map(r -> r.get("actionType") + "_" + r.get("actionState") + "_" +
r.get("commitTime")).collect(Collectors.toList()));
- assertEquals("Total archived records and total read records are the same
count", 24, archivedRecordsCount);
- assertTrue("Average Archived records per block is greater than 1",
archivedRecordsCount / numBlocks > 1);
- // make sure the archived commits are the same as the (originalcommits -
commitsleft)
- Set<String> readCommits = readRecords.stream().map(r -> (GenericRecord)
r).map(r -> {
- return r.get("commitTime").toString();
- }).collect(Collectors.toSet());
+ HoodieArchivedTimeline archivedTimeline = new
HoodieArchivedTimeline(metaClient);
+ assertEquals("Total archived records and total read records are the same
count",
+ 24, archivedTimeline.countInstants());
+ //make sure the archived commits are the same as the (originalcommits -
commitsleft)
+ Set<String> readCommits =
+
archivedTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
assertEquals("Read commits map should match the originalCommits -
commitsLoadedFromArchival",
-
originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()),
readCommits);
+
originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()),
readCommits);
// verify in-flight instants after archive
verifyInflightInstants(metaClient, 2);
- reader.close();
}
@Test
@@ -397,6 +375,37 @@ public class TestHoodieCommitArchiveLog extends
HoodieClientTestHarness {
timeline.containsInstant(new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "107")));
}
+ @Test
+ public void checkArchiveCommitTimeline() throws IOException,
InterruptedException {
+ HoodieWriteConfig cfg =
+
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2).forTable("test-trip-table")
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
3).build())
+ .build();
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
metaClient);
+
+ HoodieTestDataGenerator.createCommitFile(basePath, "1", dfs.getConf());
+ HoodieInstant instant1 = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "1");
+ HoodieTestDataGenerator.createCommitFile(basePath, "2", dfs.getConf());
+ HoodieInstant instant2 = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "2");
+ HoodieTestDataGenerator.createCommitFile(basePath, "3", dfs.getConf());
+ HoodieInstant instant3 = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "3");
+
+ //add 2 more instants to pass filter criteria set in compaction config
above
+ HoodieTestDataGenerator.createCommitFile(basePath, "4", dfs.getConf());
+ HoodieInstant instant4 = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "4");
+ HoodieTestDataGenerator.createCommitFile(basePath, "5", dfs.getConf());
+ HoodieInstant instant5 = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "5");
+
+ boolean result = archiveLog.archiveIfRequired(jsc);
+ assertTrue(result);
+
+ HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
+ List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2,
instant3);
+ assertEquals(new HashSet(archivedInstants),
archivedTimeline.getInstants().collect(Collectors.toSet()));
+ }
+
private void verifyInflightInstants(HoodieTableMetaClient metaClient, int
expectedTotalInstants) {
HoodieTimeline timeline = metaClient.getActiveTimeline().reload()
.getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterInflights();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
index 0135dbe..97288df 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
@@ -135,6 +135,7 @@ public class HoodieWriteStat implements Serializable {
/**
* Total number of rollback blocks seen in a compaction operation.
*/
+ @Nullable
private long totalRollbackBlocks;
/**
@@ -290,7 +291,7 @@ public class HoodieWriteStat implements Serializable {
return totalRollbackBlocks;
}
- public void setTotalRollbackBlocks(Long totalRollbackBlocks) {
+ public void setTotalRollbackBlocks(long totalRollbackBlocks) {
this.totalRollbackBlocks = totalRollbackBlocks;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java
index 015a497..575a9ea 100755
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java
@@ -234,6 +234,14 @@ public interface HoodieTimeline extends Serializable {
return predicateToApply.test(commit1, commit2);
}
+ /**
+ * Return true if specified timestamp is in range (startTs, endTs].
+ */
+ static boolean isInRange(String timestamp, String startTs, String endTs) {
+ return HoodieTimeline.compareTimestamps(timestamp, startTs, GREATER)
+ && HoodieTimeline.compareTimestamps(timestamp, endTs,
LESSER_OR_EQUAL);
+ }
+
static HoodieInstant getCompletedInstant(final HoodieInstant instant) {
return new HoodieInstant(State.COMPLETED, instant.getAction(),
instant.getTimestamp());
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index f322d47..e5829f8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -27,7 +27,6 @@ import org.apache.hudi.exception.HoodieIOException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
@@ -45,7 +44,6 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
/**
* Represents the Active Timeline for the Hoodie table. Instants for the last
12 hours (configurable) is in the
@@ -134,93 +132,6 @@ public class HoodieActiveTimeline extends
HoodieDefaultTimeline {
in.defaultReadObject();
}
- /**
- * Get all instants (commits, delta commits) that produce new data, in the
active timeline.
- */
- public HoodieTimeline getCommitsTimeline() {
- return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION));
- }
-
- /**
- * Get all instants (commits, delta commits, in-flight/request compaction)
that produce new data, in the active
- * timeline * With Async compaction a requested/inflight compaction-instant
is a valid baseInstant for a file-slice as
- * there could be delta-commits with that baseInstant.
- */
- @Override
- public HoodieTimeline getCommitsAndCompactionTimeline() {
- return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION, COMPACTION_ACTION));
- }
-
- /**
- * Get all instants (commits, delta commits, clean, savepoint, rollback)
that result in actions, in the active
- * timeline.
- */
- public HoodieTimeline getAllCommitsTimeline() {
- return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION, CLEAN_ACTION, COMPACTION_ACTION,
- SAVEPOINT_ACTION, ROLLBACK_ACTION));
- }
-
- /**
- * Get only pure commits (inflight and completed) in the active timeline.
- */
- public HoodieTimeline getCommitTimeline() {
- return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION));
- }
-
- /**
- * Get only the delta commits (inflight and completed) in the active
timeline.
- */
- public HoodieTimeline getDeltaCommitTimeline() {
- return new
HoodieDefaultTimeline(filterInstantsByAction(DELTA_COMMIT_ACTION),
- (Function<HoodieInstant, Option<byte[]>> & Serializable)
this::getInstantDetails);
- }
-
- /**
- * Get a timeline of a specific set of actions. useful to create a merged
timeline of multiple actions.
- *
- * @param actions actions allowed in the timeline
- */
- public HoodieTimeline getTimelineOfActions(Set<String> actions) {
- return new HoodieDefaultTimeline(getInstants().filter(s ->
actions.contains(s.getAction())),
- (Function<HoodieInstant, Option<byte[]>> & Serializable)
this::getInstantDetails);
- }
-
- /**
- * Get only the cleaner action (inflight and completed) in the active
timeline.
- */
- public HoodieTimeline getCleanerTimeline() {
- return new HoodieDefaultTimeline(filterInstantsByAction(CLEAN_ACTION),
- (Function<HoodieInstant, Option<byte[]>> & Serializable)
this::getInstantDetails);
- }
-
- /**
- * Get only the rollback action (inflight and completed) in the active
timeline.
- */
- public HoodieTimeline getRollbackTimeline() {
- return new HoodieDefaultTimeline(filterInstantsByAction(ROLLBACK_ACTION),
- (Function<HoodieInstant, Option<byte[]>> & Serializable)
this::getInstantDetails);
- }
-
- /**
- * Get only the save point action (inflight and completed) in the active
timeline.
- */
- public HoodieTimeline getSavePointTimeline() {
- return new HoodieDefaultTimeline(filterInstantsByAction(SAVEPOINT_ACTION),
- (Function<HoodieInstant, Option<byte[]>> & Serializable)
this::getInstantDetails);
- }
-
- /**
- * Get only the restore action (inflight and completed) in the active
timeline.
- */
- public HoodieTimeline getRestoreTimeline() {
- return new HoodieDefaultTimeline(filterInstantsByAction(RESTORE_ACTION),
- (Function<HoodieInstant, Option<byte[]>> & Serializable)
this::getInstantDetails);
- }
-
- protected Stream<HoodieInstant> filterInstantsByAction(String action) {
- return getInstants().filter(s -> s.getAction().equals(action));
- }
-
public void createNewInstant(HoodieInstant instant) {
LOG.info("Creating a new instant " + instant);
// Create the in-flight file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index 4e45925..a2ad80c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -18,24 +18,36 @@
package org.apache.hudi.common.table.timeline;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Represents the Archived Timeline for the Hoodie table. Instants for the
last 12 hours (configurable) is in the
@@ -49,34 +61,27 @@ import java.util.stream.Collectors;
* This class can be serialized and de-serialized and on de-serialization the
FileSystem is re-initialized.
*/
public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
+ private static final Pattern ARCHIVE_FILE_PATTERN =
+ Pattern.compile("^\\.commits_\\.archive\\.([0-9]*)$");
- private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE = "commits";
+ private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX =
"commits";
+ private static final String ACTION_TYPE_KEY = "actionType";
private HoodieTableMetaClient metaClient;
private Map<String, byte[]> readCommits = new HashMap<>();
private static final Logger LOG =
LogManager.getLogger(HoodieArchivedTimeline.class);
+ /**
+ * Loads instants between (startTs, endTs].
+ * Note that there is no lazy loading, so this may not work if really long
time range (endTs-startTs) is specified.
+ * TBD: Should we enforce maximum time range?
+ */
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
- // Read back the commits to make sure
- Path archiveLogPath =
HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
- try (SequenceFile.Reader reader =
- new SequenceFile.Reader(metaClient.getHadoopConf(),
SequenceFile.Reader.file(archiveLogPath))) {
- Text key = new Text();
- Text val = new Text();
- while (reader.next(key, val)) {
- // TODO - limit the number of commits loaded in memory. this could get
very large.
- // This is okay because only tooling will load the archived commit
timeline today
- readCommits.put(key.toString(), Arrays.copyOf(val.getBytes(),
val.getLength()));
- }
- this.setInstants(readCommits.keySet().stream().map(s -> new
HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, s))
- .collect(Collectors.toList()));
- } catch (IOException e) {
- throw new HoodieIOException("Could not load archived commit timeline
from path " + archiveLogPath, e);
- }
+ this.metaClient = metaClient;
+ setInstants(this.loadInstants(false));
// multiple casts will make this lambda serializable -
// http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable)
this::getInstantDetails;
- this.metaClient = metaClient;
}
/**
@@ -96,7 +101,16 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
}
public static Path getArchiveLogPath(String archiveFolder) {
- return new Path(archiveFolder, HOODIE_COMMIT_ARCHIVE_LOG_FILE);
+ return new Path(archiveFolder, HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX);
+ }
+
+ public void loadInstantDetailsInMemory(String startTs, String endTs) {
+ loadInstants(startTs, endTs);
+ }
+
+ public void clearInstantDetailsFromMemory(String startTs, String endTs) {
+ this.findInstantsInRange(startTs, endTs).getInstants().forEach(instant ->
+ this.readCommits.remove(instant.getTimestamp()));
}
@Override
@@ -108,4 +122,136 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
return new HoodieArchivedTimeline(metaClient);
}
+ private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) {
+ final String commitTime =
record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString();
+ final String action = record.get(ACTION_TYPE_KEY).toString();
+ if (loadDetails) {
+ Option.ofNullable(record.get(getMetadataKey(action))).map(actionData ->
+ this.readCommits.put(commitTime,
actionData.toString().getBytes(StandardCharsets.UTF_8))
+ );
+ }
+ return new HoodieInstant(false, action, commitTime);
+ }
+
+ private String getMetadataKey(String action) {
+ switch (action) {
+ case HoodieTimeline.CLEAN_ACTION:
+ return "hoodieCleanMetadata";
+ case HoodieTimeline.COMMIT_ACTION:
+ return "hoodieCommitMetadata";
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ return "hoodieCommitMetadata";
+ case HoodieTimeline.ROLLBACK_ACTION:
+ return "hoodieRollbackMetadata";
+ case HoodieTimeline.SAVEPOINT_ACTION:
+ return "hoodieSavePointMetadata";
+ default:
+ throw new HoodieIOException("Unknown action in metadata " + action);
+ }
+ }
+
+ private List<HoodieInstant> loadInstants(boolean loadInstantDetails) {
+ return loadInstants(null, loadInstantDetails);
+ }
+
+ private List<HoodieInstant> loadInstants(String startTs, String endTs) {
+ return loadInstants(new TimeRangeFilter(startTs, endTs), true);
+ }
+
+ /**
+ * This is method to read selected instants. Do NOT use this directly use
one of the helper methods above
+ * If loadInstantDetails is set to true, this would also update
'readCommits' map with commit details
+ * If filter is specified, only the filtered instants are loaded
+ */
+ private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean
loadInstantDetails) {
+ try {
+ // list all files
+ FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+ new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+
+ // sort files by version suffix in reverse (implies reverse
chronological order)
+ Arrays.sort(fsStatuses, new ArchiveFileVersionComparator());
+
+ List<HoodieInstant> instantsInRange = new ArrayList<>();
+ for (FileStatus fs : fsStatuses) {
+ //read the archived file
+ HoodieLogFormat.Reader reader =
HoodieLogFormat.newReader(metaClient.getFs(),
+ new HoodieLogFile(fs.getPath()),
HoodieArchivedMetaEntry.getClassSchema());
+ try {
+ int instantsInPreviousFile = instantsInRange.size();
+ //read the avro blocks
+ while (reader.hasNext()) {
+ HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
+ // TODO If we can store additional metadata in datablock, we can
skip parsing records
+ // (such as startTime, endTime of records in the block)
+ List<IndexedRecord> records = blk.getRecords();
+ // filter blocks in desired time window
+ Stream<HoodieInstant> instantsInBlkStream = records.stream()
+ .map(r -> readCommit((GenericRecord) r,
loadInstantDetails));
+
+ if (filter != null) {
+ instantsInBlkStream =
instantsInBlkStream.filter(filter::isInRange);
+ }
+
+
instantsInRange.addAll(instantsInBlkStream.collect(Collectors.toList()));
+ }
+
+ if (filter != null) {
+ int instantsInCurrentFile = instantsInRange.size() -
instantsInPreviousFile;
+ if (instantsInPreviousFile > 0 && instantsInCurrentFile == 0) {
+ // Note that this is an optimization to skip reading unnecessary
archived files
+ // This signals we crossed lower bound of desired time window.
+ break;
+ }
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
+ return instantsInRange;
+ } catch (IOException e) {
+ throw new HoodieIOException(
+ "Could not load archived commit timeline from path " +
metaClient.getArchivePath(), e);
+ }
+ }
+
+ private static class TimeRangeFilter {
+ private final String startTs;
+ private final String endTs;
+
+ public TimeRangeFilter(String startTs, String endTs) {
+ this.startTs = startTs;
+ this.endTs = endTs;
+ }
+
+ public boolean isInRange(HoodieInstant instant) {
+ return HoodieTimeline.isInRange(instant.getTimestamp(), this.startTs,
this.endTs);
+ }
+ }
+
+ /**
+ * Sort files by reverse order of version suffix in file name.
+ */
+ public static class ArchiveFileVersionComparator implements
Comparator<FileStatus>, Serializable {
+ @Override
+ public int compare(FileStatus f1, FileStatus f2) {
+ return Integer.compare(getArchivedFileSuffix(f2),
getArchivedFileSuffix(f1));
+ }
+
+ private int getArchivedFileSuffix(FileStatus f) {
+ try {
+ Matcher fileMatcher =
ARCHIVE_FILE_PATTERN.matcher(f.getPath().getName());
+ if (fileMatcher.matches()) {
+ return Integer.parseInt(fileMatcher.group(1));
+ }
+ } catch (NumberFormatException e) {
+ // log and ignore any format warnings
+ LOG.warn("error getting suffix for archived file: " + f.getPath());
+ }
+
+ // return default value in case of any errors
+ return 0;
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 78d6c6f..9f06629 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import java.io.Serializable;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
@@ -126,8 +127,7 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
@Override
public HoodieDefaultTimeline findInstantsInRange(String startTs, String
endTs) {
return new HoodieDefaultTimeline(
- instants.stream().filter(s ->
HoodieTimeline.compareTimestamps(s.getTimestamp(), startTs, GREATER)
- && HoodieTimeline.compareTimestamps(s.getTimestamp(), endTs,
LESSER_OR_EQUAL)),
+ instants.stream().filter(s ->
HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)),
details);
}
@@ -143,6 +143,83 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
return new HoodieDefaultTimeline(instants.stream().filter(filter),
details);
}
+ /**
+ * Get all instants (commits, delta commits) that produce new data, in the
active timeline.
+ */
+ public HoodieTimeline getCommitsTimeline() {
+ return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION));
+ }
+
+ /**
+ * Get all instants (commits, delta commits, clean, savepoint, rollback)
that result in actions, in the active
+ * timeline.
+ */
+ public HoodieTimeline getAllCommitsTimeline() {
+ return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION, CLEAN_ACTION, COMPACTION_ACTION,
+ SAVEPOINT_ACTION, ROLLBACK_ACTION));
+ }
+
+ /**
+ * Get only pure commits (inflight and completed) in the active timeline.
+ */
+ public HoodieTimeline getCommitTimeline() {
+ return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION));
+ }
+
+ /**
+ * Get only the delta commits (inflight and completed) in the active
timeline.
+ */
+ public HoodieTimeline getDeltaCommitTimeline() {
+ return new
HoodieDefaultTimeline(filterInstantsByAction(DELTA_COMMIT_ACTION),
+ (Function<HoodieInstant, Option<byte[]>> & Serializable)
this::getInstantDetails);
+ }
+
+ /**
+ * Get a timeline of a specific set of actions. useful to create a merged
timeline of multiple actions.
+ *
+ * @param actions actions allowed in the timeline
+ */
+ public HoodieTimeline getTimelineOfActions(Set<String> actions) {
+ return new HoodieDefaultTimeline(getInstants().filter(s ->
actions.contains(s.getAction())),
+ (Function<HoodieInstant, Option<byte[]>> & Serializable)
this::getInstantDetails);
+ }
+
+ /**
+ * Get only the cleaner action (inflight and completed) in the active
timeline.
+ */
+ public HoodieTimeline getCleanerTimeline() {
+ return new HoodieDefaultTimeline(filterInstantsByAction(CLEAN_ACTION),
+ (Function<HoodieInstant, Option<byte[]>> & Serializable)
this::getInstantDetails);
+ }
+
+ /**
+ * Get only the rollback action (inflight and completed) in the active
timeline.
+ */
+ public HoodieTimeline getRollbackTimeline() {
+ return new HoodieDefaultTimeline(filterInstantsByAction(ROLLBACK_ACTION),
+ (Function<HoodieInstant, Option<byte[]>> & Serializable)
this::getInstantDetails);
+ }
+
+ /**
+ * Get only the save point action (inflight and completed) in the active
timeline.
+ */
+ public HoodieTimeline getSavePointTimeline() {
+ return new HoodieDefaultTimeline(filterInstantsByAction(SAVEPOINT_ACTION),
+ (Function<HoodieInstant, Option<byte[]>> & Serializable)
this::getInstantDetails);
+ }
+
+ /**
+ * Get only the restore action (inflight and completed) in the active
timeline.
+ */
+ public HoodieTimeline getRestoreTimeline() {
+ return new HoodieDefaultTimeline(filterInstantsByAction(RESTORE_ACTION),
+ (Function<HoodieInstant, Option<byte[]>> & Serializable)
this::getInstantDetails);
+ }
+
+ protected Stream<HoodieInstant> filterInstantsByAction(String action) {
+ return getInstants().filter(s -> s.getAction().equals(action));
+ }
+
@Override
public boolean empty() {
return !instants.stream().findFirst().isPresent();
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
index 6864623..8b9f643 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
@@ -21,20 +21,13 @@ package org.apache.hudi.common.table;
import org.apache.hudi.common.HoodieCommonTestHarness;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.stream.Collectors;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -100,32 +93,4 @@ public class TestHoodieTableMetaClient extends
HoodieCommonTestHarness {
assertArrayEquals("Commit value should be \"test-detail\"",
"test-detail".getBytes(),
activeCommitTimeline.getInstantDetails(completedInstant).get());
}
-
- @Test
- public void checkArchiveCommitTimeline() throws IOException {
- Path archiveLogPath =
HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
- SequenceFile.Writer writer =
- SequenceFile.createWriter(metaClient.getHadoopConf(),
SequenceFile.Writer.file(archiveLogPath),
- SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class));
-
- writer.append(new Text("1"), new Text("data1"));
- writer.append(new Text("2"), new Text("data2"));
- writer.append(new Text("3"), new Text("data3"));
-
- IOUtils.closeStream(writer);
-
- HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
-
- HoodieInstant instant1 = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "1");
- HoodieInstant instant2 = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "2");
- HoodieInstant instant3 = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "3");
-
- assertEquals(Arrays.asList(instant1, instant2, instant3),
- archivedTimeline.getInstants().collect(Collectors.toList()));
-
- assertArrayEquals(new Text("data1").getBytes(),
archivedTimeline.getInstantDetails(instant1).get());
- assertArrayEquals(new Text("data2").getBytes(),
archivedTimeline.getInstantDetails(instant2).get());
- assertArrayEquals(new Text("data3").getBytes(),
archivedTimeline.getInstantDetails(instant3).get());
- }
-
}