This is an automated email from the ASF dual-hosted git repository.
satish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4a34318 [HUDI-1746] Added support for replace commits in commit
showpartitions, commit show_write_stats, commit showfiles (#2678)
4a34318 is described below
commit 4a3431866d995fd877a067967457591a0bc8847d
Author: jsbali <[email protected]>
AuthorDate: Wed Apr 21 23:01:35 2021 +0530
[HUDI-1746] Added support for replace commits in commit showpartitions,
commit show_write_stats, commit showfiles (#2678)
* Added support for replace commits in commit showpartitions, commit
show_write_stats, commit showfiles
* Adding CR changes
* [HUDI-1746] Code review changes
---
.../apache/hudi/cli/commands/CommitsCommand.java | 79 ++++++++++----
.../hudi/cli/commands/TestCommitsCommand.java | 120 ++++++++++++++++++++-
.../HoodieTestCommitMetadataGenerator.java | 21 ++--
.../HoodieTestReplaceCommitMetadatGenerator.java | 92 ++++++++++++++++
4 files changed, 282 insertions(+), 30 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
index 3e216b4..9517234 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
@@ -26,6 +26,7 @@ import org.apache.hudi.cli.utils.CommitUtil;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -34,6 +35,7 @@ import
org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.NumericUtils;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.spark.launcher.SparkLauncher;
@@ -44,6 +46,7 @@ import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -266,15 +269,18 @@ public class CommitsCommand implements CommandMarker {
HoodieActiveTimeline activeTimeline =
HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline =
activeTimeline.getCommitsTimeline().filterCompletedInstants();
- HoodieInstant commitInstant = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, instantTime);
- if (!timeline.containsInstant(commitInstant)) {
+ Option<HoodieInstant> hoodieInstantOption = getCommitForInstant(timeline,
instantTime);
+ Option<HoodieCommitMetadata> commitMetadataOptional =
getHoodieCommitMetadata(timeline, hoodieInstantOption);
+
+ if (!commitMetadataOptional.isPresent()) {
return "Commit " + instantTime + " not found in Commits " + timeline;
}
- HoodieCommitMetadata meta =
HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get(),
- HoodieCommitMetadata.class);
+
+ HoodieCommitMetadata meta = commitMetadataOptional.get();
List<Comparable[]> rows = new ArrayList<>();
for (Map.Entry<String, List<HoodieWriteStat>> entry :
meta.getPartitionToWriteStats().entrySet()) {
+ String action = hoodieInstantOption.get().getAction();
String path = entry.getKey();
List<HoodieWriteStat> stats = entry.getValue();
long totalFilesAdded = 0;
@@ -294,7 +300,7 @@ public class CommitsCommand implements CommandMarker {
totalBytesWritten += stat.getTotalWriteBytes();
totalWriteErrors += stat.getTotalWriteErrors();
}
- rows.add(new Comparable[] {path, totalFilesAdded, totalFilesUpdated,
totalRecordsInserted, totalRecordsUpdated,
+ rows.add(new Comparable[] {action, path, totalFilesAdded,
totalFilesUpdated, totalRecordsInserted, totalRecordsUpdated,
totalBytesWritten, totalWriteErrors});
}
@@ -302,7 +308,8 @@ public class CommitsCommand implements CommandMarker {
fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN,
entry ->
NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString()))));
- TableHeader header = new
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
+ TableHeader header = new
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_INSERTED)
@@ -328,24 +335,29 @@ public class CommitsCommand implements CommandMarker {
HoodieActiveTimeline activeTimeline =
HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline =
activeTimeline.getCommitsTimeline().filterCompletedInstants();
- HoodieInstant commitInstant = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, instantTime);
- if (!timeline.containsInstant(commitInstant)) {
+ Option<HoodieInstant> hoodieInstantOption = getCommitForInstant(timeline,
instantTime);
+ Option<HoodieCommitMetadata> commitMetadataOptional =
getHoodieCommitMetadata(timeline, hoodieInstantOption);
+
+ if (!commitMetadataOptional.isPresent()) {
return "Commit " + instantTime + " not found in Commits " + timeline;
}
- HoodieCommitMetadata meta =
HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get(),
- HoodieCommitMetadata.class);
+
+ HoodieCommitMetadata meta = commitMetadataOptional.get();
+
+ String action = hoodieInstantOption.get().getAction();
long recordsWritten = meta.fetchTotalRecordsWritten();
long bytesWritten = meta.fetchTotalBytesWritten();
long avgRecSize = (long) Math.ceil((1.0 * bytesWritten) / recordsWritten);
List<Comparable[]> rows = new ArrayList<>();
- rows.add(new Comparable[] {bytesWritten, recordsWritten, avgRecSize});
+ rows.add(new Comparable[] {action, bytesWritten, recordsWritten,
avgRecSize});
Map<String, Function<Object, String>> fieldNameToConverterMap = new
HashMap<>();
fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN,
entry ->
NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString()))));
- TableHeader header = new
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN_COMMIT)
+ TableHeader header = new
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN_COMMIT)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN_COMMIT)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_AVG_REC_SIZE_COMMIT);
@@ -367,24 +379,28 @@ public class CommitsCommand implements CommandMarker {
HoodieActiveTimeline activeTimeline =
HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline =
activeTimeline.getCommitsTimeline().filterCompletedInstants();
- HoodieInstant commitInstant = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, instantTime);
- if (!timeline.containsInstant(commitInstant)) {
+ Option<HoodieInstant> hoodieInstantOption = getCommitForInstant(timeline,
instantTime);
+ Option<HoodieCommitMetadata> commitMetadataOptional =
getHoodieCommitMetadata(timeline, hoodieInstantOption);
+
+ if (!commitMetadataOptional.isPresent()) {
return "Commit " + instantTime + " not found in Commits " + timeline;
}
- HoodieCommitMetadata meta =
HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get(),
- HoodieCommitMetadata.class);
+
+ HoodieCommitMetadata meta = commitMetadataOptional.get();
List<Comparable[]> rows = new ArrayList<>();
for (Map.Entry<String, List<HoodieWriteStat>> entry :
meta.getPartitionToWriteStats().entrySet()) {
+ String action = hoodieInstantOption.get().getAction();
String path = entry.getKey();
List<HoodieWriteStat> stats = entry.getValue();
for (HoodieWriteStat stat : stats) {
- rows.add(new Comparable[] {path, stat.getFileId(),
stat.getPrevCommit(), stat.getNumUpdateWrites(),
+ rows.add(new Comparable[] {action, path, stat.getFileId(),
stat.getPrevCommit(), stat.getNumUpdateWrites(),
stat.getNumWrites(), stat.getTotalWriteBytes(),
stat.getTotalWriteErrors(), stat.getFileSizeInBytes()});
}
}
- TableHeader header = new
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
+ TableHeader header = new
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED)
@@ -431,4 +447,31 @@ public class CommitsCommand implements CommandMarker {
return "Load sync state between " +
HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and "
+ HoodieCLI.syncTableMetadata.getTableConfig().getTableName();
}
+
+ /*
+ Checks whether a commit or replacecommit action exists in the timeline.
+ * */
+ private Option<HoodieInstant> getCommitForInstant(HoodieTimeline timeline,
String instantTime) throws IOException {
+ List<HoodieInstant> instants = Arrays.asList(
+ new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION,
instantTime),
+ new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION,
instantTime),
+ new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION,
instantTime));
+
+ Option<HoodieInstant> hoodieInstant =
Option.fromJavaOptional(instants.stream().filter(timeline::containsInstant).findAny());
+
+ return hoodieInstant;
+ }
+
+ private Option<HoodieCommitMetadata> getHoodieCommitMetadata(HoodieTimeline
timeline, Option<HoodieInstant> hoodieInstant) throws IOException {
+ if (hoodieInstant.isPresent()) {
+ if
(hoodieInstant.get().getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+ return
Option.of(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get()).get(),
+ HoodieReplaceCommitMetadata.class));
+ }
+ return
Option.of(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get()).get(),
+ HoodieCommitMetadata.class));
+ }
+
+ return Option.empty();
+ }
}
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
index 5ad4c4c..e88f129 100644
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
@@ -24,10 +24,12 @@ import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
+import org.apache.hudi.cli.testutils.HoodieTestReplaceCommitMetadatGenerator;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
@@ -75,6 +77,7 @@ public class TestCommitsCommand extends
AbstractShellIntegrationTest {
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1,
"org.apache.hudi.common.model.HoodieAvroPayload");
+ metaClient = HoodieCLI.getTableMetaClient();
}
private LinkedHashMap<String, Integer[]> generateData() throws Exception {
@@ -97,6 +100,42 @@ public class TestCommitsCommand extends
AbstractShellIntegrationTest {
return data;
}
+ /*
+ * generates both replace commit and commit data
+ * */
+ private LinkedHashMap<HoodieInstant, Integer[]> generateMixedData() throws
Exception {
+ // generate data and metadata
+ LinkedHashMap<HoodieInstant, Integer[]> replaceCommitData = new
LinkedHashMap<>();
+ replaceCommitData.put(new HoodieInstant(false,
HoodieTimeline.REPLACE_COMMIT_ACTION, "103"), new Integer[] {15, 10});
+
+ LinkedHashMap<HoodieInstant, Integer[]> commitData = new LinkedHashMap<>();
+ commitData.put(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION,
"102"), new Integer[] {15, 10});
+ commitData.put(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION,
"101"), new Integer[] {20, 10});
+
+ for (Map.Entry<HoodieInstant, Integer[]> entry : commitData.entrySet()) {
+ String key = entry.getKey().getTimestamp();
+ Integer[] value = entry.getValue();
+
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, key,
jsc.hadoopConfiguration(),
+ Option.of(value[0]), Option.of(value[1]));
+ }
+
+ for (Map.Entry<HoodieInstant, Integer[]> entry :
replaceCommitData.entrySet()) {
+ String key = entry.getKey().getTimestamp();
+ Integer[] value = entry.getValue();
+
HoodieTestReplaceCommitMetadatGenerator.createReplaceCommitFileWithMetadata(tablePath,
key,
+ Option.of(value[0]), Option.of(value[1]), metaClient);
+ }
+
+ metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
+ assertEquals(3,
metaClient.reloadActiveTimeline().getCommitsTimeline().countInstants(),
+ "There should be 3 commits");
+
+ LinkedHashMap<HoodieInstant, Integer[]> data = replaceCommitData;
+ data.putAll(commitData);
+
+ return data;
+ }
+
private String generateExpectData(int records, Map<String, Integer[]> data)
throws IOException {
FileSystem fs = FileSystem.get(jsc.hadoopConfiguration());
List<String> partitionPaths =
@@ -216,14 +255,15 @@ public class TestCommitsCommand extends
AbstractShellIntegrationTest {
// prevCommit not null, so add 0, update 1
Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition
->
- rows.add(new Comparable[] {partition, 0, 1, 0, value[1],
HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, 0})
+ rows.add(new Comparable[] {HoodieTimeline.COMMIT_ACTION, partition, 0,
1, 0, value[1], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, 0})
);
Map<String, Function<Object, String>> fieldNameToConverterMap = new
HashMap<>();
fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN,
entry ->
NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString()))));
- TableHeader header = new
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
+ TableHeader header = new
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_INSERTED)
@@ -237,6 +277,43 @@ public class TestCommitsCommand extends
AbstractShellIntegrationTest {
assertEquals(expected, got);
}
+ @Test
+ public void testShowCommitPartitionsWithReplaceCommits() throws Exception {
+ Map<HoodieInstant, Integer[]> data = generateMixedData();
+
+ for (HoodieInstant commitInstant: data.keySet()) {
+ CommandResult cr = getShell().executeCommand(String.format("commit
showpartitions --commit %s", commitInstant.getTimestamp()));
+
+ assertTrue(cr.isSuccess());
+
+ Integer[] value = data.get(commitInstant);
+ List<Comparable[]> rows = new ArrayList<>();
+ // prevCommit not null, so add 0, update 1
+ Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition
->
+ rows.add(new Comparable[] {commitInstant.getAction(), partition, 0,
1, 0, value[1], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, 0})
+ );
+
+ Map<String, Function<Object, String>> fieldNameToConverterMap = new
HashMap<>();
+
fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN,
+ entry ->
NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString()))));
+
+ TableHeader header = new
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
+
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
+
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
+
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_INSERTED)
+
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED)
+
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS);
+
+ String expected = HoodiePrintHelper.print(header,
fieldNameToConverterMap, "", false, -1, false, rows);
+ expected = removeNonWordAndStripSpace(expected);
+ String got = removeNonWordAndStripSpace(cr.getResult().toString());
+ assertEquals(expected, got);
+ }
+ }
+
/**
* Test case of 'commit showfiles' command.
*/
@@ -252,12 +329,13 @@ public class TestCommitsCommand extends
AbstractShellIntegrationTest {
List<Comparable[]> rows = new ArrayList<>();
Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition
->
- rows.add(new Comparable[] {partition,
HoodieTestCommitMetadataGenerator.DEFAULT_FILEID,
+ rows.add(new Comparable[] {HoodieTimeline.COMMIT_ACTION, partition,
HoodieTestCommitMetadataGenerator.DEFAULT_FILEID,
HoodieTestCommitMetadataGenerator.DEFAULT_PRE_COMMIT,
value[1], value[0],
HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES,
// default 0 errors and blank file with 0 size
0, 0}));
- TableHeader header = new
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
+ TableHeader header = new
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED)
@@ -272,6 +350,40 @@ public class TestCommitsCommand extends
AbstractShellIntegrationTest {
assertEquals(expected, got);
}
+ @Test
+ public void testShowCommitFilesWithReplaceCommits() throws Exception {
+ Map<HoodieInstant, Integer[]> data = generateMixedData();
+
+ for (HoodieInstant commitInstant : data.keySet()) {
+ CommandResult cr = getShell().executeCommand(String.format("commit
showfiles --commit %s", commitInstant.getTimestamp()));
+ assertTrue(cr.isSuccess());
+
+ Integer[] value = data.get(commitInstant);
+ List<Comparable[]> rows = new ArrayList<>();
+ Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition
->
+ rows.add(new Comparable[] {commitInstant.getAction(), partition,
HoodieTestCommitMetadataGenerator.DEFAULT_FILEID,
+ HoodieTestCommitMetadataGenerator.DEFAULT_PRE_COMMIT,
+ value[1], value[0],
HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES,
+ // default 0 errors and blank file with 0 size
+ 0, 0}));
+ TableHeader header = new
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT)
+
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED)
+
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN)
+
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_SIZE);
+
+ String expected = HoodiePrintHelper.print(header, new HashMap<>(), "",
false, -1, false, rows);
+ expected = removeNonWordAndStripSpace(expected);
+ String got = removeNonWordAndStripSpace(cr.getResult().toString());
+ assertEquals(expected, got);
+ }
+ }
+
/**
* Test case of 'commits compare' command.
*/
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
index f4d8019..c33bb26 100644
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
@@ -76,14 +77,17 @@ public class HoodieTestCommitMetadataGenerator extends
HoodieTestDataGenerator {
List<String> commitFileNames =
Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime),
HoodieTimeline.makeInflightCommitFileName(commitTime),
HoodieTimeline.makeRequestedCommitFileName(commitTime));
for (String name : commitFileNames) {
- Path commitFilePath = new Path(basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/" + name);
- try (FSDataOutputStream os = FSUtils.getFs(basePath,
configuration).create(commitFilePath, true)) {
- // Generate commitMetadata
- HoodieCommitMetadata commitMetadata =
- generateCommitMetadata(basePath, commitTime, fileId1, fileId2,
writes, updates);
- // Write empty commit metadata
- os.writeBytes(new
String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
- }
+ HoodieCommitMetadata commitMetadata =
+ generateCommitMetadata(basePath, commitTime, fileId1, fileId2,
writes, updates);
+ String content = commitMetadata.toJsonString();
+ createFileWithMetadata(basePath, configuration, name, content);
+ }
+ }
+
+ static void createFileWithMetadata(String basePath, Configuration
configuration, String name, String content) throws IOException {
+ Path commitFilePath = new Path(basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/" + name);
+ try (FSDataOutputStream os = FSUtils.getFs(basePath,
configuration).create(commitFilePath, true)) {
+ os.writeBytes(new String(content.getBytes(StandardCharsets.UTF_8)));
}
}
@@ -133,4 +137,5 @@ public class HoodieTestCommitMetadataGenerator extends
HoodieTestDataGenerator {
}));
return metadata;
}
+
}
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestReplaceCommitMetadatGenerator.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestReplaceCommitMetadatGenerator.java
new file mode 100644
index 0000000..f7244f9
--- /dev/null
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestReplaceCommitMetadatGenerator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.testutils;
+
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.FileCreateUtils;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.Option;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
+import static org.apache.hudi.common.util.CollectionUtils.createImmutableList;
+
+public class HoodieTestReplaceCommitMetadatGenerator extends
HoodieTestCommitMetadataGenerator {
+ public static void createReplaceCommitFileWithMetadata(String basePath,
String commitTime, Option<Integer> writes, Option<Integer> updates,
+ HoodieTableMetaClient
metaclient) throws Exception {
+
+ HoodieReplaceCommitMetadata replaceMetadata =
generateReplaceCommitMetadata(basePath, commitTime,
UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(), writes, updates);
+ HoodieRequestedReplaceMetadata requestedReplaceMetadata =
getHoodieRequestedReplaceMetadata();
+
+ HoodieTestTable.of(metaclient).addReplaceCommit(commitTime,
requestedReplaceMetadata, replaceMetadata);
+ }
+
+ private static HoodieRequestedReplaceMetadata
getHoodieRequestedReplaceMetadata() {
+ return HoodieRequestedReplaceMetadata.newBuilder()
+ .setOperationType(WriteOperationType.INSERT_OVERWRITE.toString())
+ .setVersion(1)
+ .setExtraMetadata(Collections.emptyMap())
+ .build();
+ }
+
+ private static HoodieReplaceCommitMetadata
generateReplaceCommitMetadata(String basePath, String commitTime, String
fileId1, String fileId2, Option<Integer> writes, Option<Integer> updates)
+ throws Exception {
+ FileCreateUtils.createBaseFile(basePath, DEFAULT_FIRST_PARTITION_PATH,
commitTime, fileId1);
+ FileCreateUtils.createBaseFile(basePath, DEFAULT_SECOND_PARTITION_PATH,
commitTime, fileId2);
+ return generateReplaceCommitMetadata(new HashMap<String, List<String>>() {
+ {
+ put(DEFAULT_FIRST_PARTITION_PATH,
createImmutableList(baseFileName(DEFAULT_FIRST_PARTITION_PATH, fileId1)));
+ put(DEFAULT_SECOND_PARTITION_PATH,
createImmutableList(baseFileName(DEFAULT_SECOND_PARTITION_PATH, fileId2)));
+ }
+ }, writes, updates);
+ }
+
+ private static HoodieReplaceCommitMetadata
generateReplaceCommitMetadata(HashMap<String, List<String>>
partitionToFilePaths, Option<Integer> writes, Option<Integer> updates) {
+ HoodieReplaceCommitMetadata metadata = new HoodieReplaceCommitMetadata();
+ partitionToFilePaths.forEach((key, value) -> value.forEach(f -> {
+ HoodieWriteStat writeStat = new HoodieWriteStat();
+ writeStat.setPartitionPath(key);
+ writeStat.setPath(DEFAULT_PATH);
+ writeStat.setFileId(DEFAULT_FILEID);
+ writeStat.setTotalWriteBytes(DEFAULT_TOTAL_WRITE_BYTES);
+ writeStat.setPrevCommit(DEFAULT_PRE_COMMIT);
+ writeStat.setNumWrites(writes.orElse(DEFAULT_NUM_WRITES));
+ writeStat.setNumUpdateWrites(updates.orElse(DEFAULT_NUM_UPDATE_WRITES));
+ writeStat.setTotalLogBlocks(DEFAULT_TOTAL_LOG_BLOCKS);
+ writeStat.setTotalLogRecords(DEFAULT_TOTAL_LOG_RECORDS);
+ metadata.addWriteStat(key, writeStat);
+ }));
+ metadata.setPartitionToReplaceFileIds(new HashMap<String, List<String>>() {
+ {
+ //TODO fix
+ put(DEFAULT_FIRST_PARTITION_PATH,
createImmutableList(baseFileName(DEFAULT_FIRST_PARTITION_PATH, "1")));
+ }
+ });
+ return metadata;
+ }
+}