nsivabalan commented on a change in pull request #4092:
URL: https://github.com/apache/hudi/pull/4092#discussion_r756342835
##########
File path:
hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
##########
@@ -40,25 +45,32 @@
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* CLI commands to operate on the Metadata Table.
Review comment:
Can you add an example here.
for eg:
connect
set spark env variables
sample metadata command.
##########
File path:
hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
##########
@@ -152,72 +163,192 @@ public String stats() throws IOException {
config, HoodieCLI.basePath, "/tmp");
Map<String, String> stats = metadata.stats();
- StringBuffer out = new StringBuffer("\n");
- out.append(String.format("Base path: %s\n",
getMetadataTableBasePath(HoodieCLI.basePath)));
+ final List<Comparable[]> rows = new ArrayList<>();
for (Map.Entry<String, String> entry : stats.entrySet()) {
- out.append(String.format("%s: %s\n", entry.getKey(), entry.getValue()));
+ Comparable[] row = new Comparable[2];
+ row[0] = entry.getKey();
+ row[1] = entry.getValue();
+ rows.add(row);
}
- return out.toString();
+ TableHeader header = new TableHeader()
+ .addTableHeaderField("stat key")
+ .addTableHeaderField("stat value");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "",
+ false, Integer.MAX_VALUE, false, rows);
}
- @CliCommand(value = "metadata list-partitions", help = "Print a list of all
partitions from the metadata")
+ @CliCommand(value = "metadata list-partitions", help = "List all partitions
from metadata")
public String listPartitions() throws IOException {
HoodieCLI.getTableMetaClient();
initJavaSparkContext();
HoodieMetadataConfig config =
HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new
HoodieSparkEngineContext(jsc), config,
HoodieCLI.basePath, "/tmp");
- StringBuffer out = new StringBuffer("\n");
if (!metadata.enabled()) {
- out.append("=== Metadata Table not initilized. Using file listing to get
list of partitions. ===\n\n");
+ return "[ERROR] Metadata Table not enabled/initialized\n\n";
}
- long t1 = System.currentTimeMillis();
+ HoodieTimer timer = new HoodieTimer().startTimer();
List<String> partitions = metadata.getAllPartitionPaths();
- long t2 = System.currentTimeMillis();
-
- int[] count = {0};
- partitions.stream().sorted((p1, p2) -> p2.compareTo(p1)).forEach(p -> {
- out.append(p);
- if (++count[0] % 15 == 0) {
- out.append("\n");
- } else {
- out.append(", ");
- }
+ LOG.debug("Took " + timer.endTimer() + " ms");
+
+ final List<Comparable[]> rows = new ArrayList<>();
+ partitions.stream().sorted(Comparator.reverseOrder()).forEach(p -> {
+ Comparable[] row = new Comparable[1];
+ row[0] = p;
+ rows.add(row);
+ LOG.debug(">> partition " + p);
});
- out.append(String.format("\n\n=== List of partitions retrieved in %.2fsec
===", (t2 - t1) / 1000.0));
-
- return out.toString();
+ TableHeader header = new TableHeader().addTableHeaderField("partition");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "",
+ false, Integer.MAX_VALUE, false, rows);
}
@CliCommand(value = "metadata list-files", help = "Print a list of all files
in a partition from the metadata")
public String listFiles(
- @CliOption(key = {"partition"}, help = "Name of the partition to list
files", mandatory = true)
- final String partition) throws IOException {
+ @CliOption(key = {"partition"}, help = "Name of the partition to list
files", mandatory = true) final String partition) throws IOException {
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config =
HoodieMetadataConfig.newBuilder().enable(true).build();
- HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(new
HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
+ HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(
+ new HoodieLocalEngineContext(HoodieCLI.conf), config,
HoodieCLI.basePath, "/tmp");
- StringBuffer out = new StringBuffer("\n");
if (!metaReader.enabled()) {
- out.append("=== Metadata Table not initialized. Using file listing to
get list of files in partition. ===\n\n");
+ return "[ERROR] Metadata Table not enabled/initialized\n\n";
}
- long t1 = System.currentTimeMillis();
+ HoodieTimer timer = new HoodieTimer().startTimer();
FileStatus[] statuses = metaReader.getAllFilesInPartition(new
Path(HoodieCLI.basePath, partition));
- long t2 = System.currentTimeMillis();
+ LOG.debug("Took " + timer.endTimer() + " ms");
- Arrays.stream(statuses).sorted((p1, p2) ->
p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(p -> {
- out.append("\t" + p.getPath().getName());
- out.append("\n");
+ final List<Comparable[]> rows = new ArrayList<>();
+ Arrays.stream(statuses).sorted((p1, p2) ->
p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(f -> {
+ Comparable[] row = new Comparable[1];
+ row[0] = f;
+ rows.add(row);
});
- out.append(String.format("\n=== Files in partition retrieved in %.2fsec
===", (t2 - t1) / 1000.0));
+ TableHeader header = new TableHeader().addTableHeaderField("file path");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "",
+ false, Integer.MAX_VALUE, false, rows);
+ }
+
+ @CliCommand(value = "metadata validate-files", help = "Validate all files in
all partitions from the metadata")
+ public String validateFiles(
+ @CliOption(key = {"verbose"}, help = "Print all file details",
unspecifiedDefaultValue = "false") final boolean verbose) throws IOException {
+ HoodieCLI.getTableMetaClient();
+ HoodieMetadataConfig config =
HoodieMetadataConfig.newBuilder().enable(true).build();
+ HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(
+ new HoodieLocalEngineContext(HoodieCLI.conf), config,
HoodieCLI.basePath, "/tmp");
+
+ if (!metaReader.enabled()) {
+ return "[ERROR] Metadata Table not enabled/initialized\n\n";
+ }
- return out.toString();
+ HoodieMetadataConfig fsConfig =
HoodieMetadataConfig.newBuilder().enable(false).build();
+ HoodieBackedTableMetadata fsMetaReader = new HoodieBackedTableMetadata(
+ new HoodieLocalEngineContext(HoodieCLI.conf), fsConfig,
HoodieCLI.basePath, "/tmp");
+
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ List<String> metadataPartitions = metaReader.getAllPartitionPaths();
+ List<String> fsPartitions = fsMetaReader.getAllPartitionPaths();
+ Collections.sort(fsPartitions);
+ Collections.sort(metadataPartitions);
+
+ Set<String> allPartitions = new HashSet<>();
+ allPartitions.addAll(fsPartitions);
+ allPartitions.addAll(metadataPartitions);
+
+ LOG.info("All FS partitions count " + fsPartitions.size() + ", metadata
partition count " + metadataPartitions.size());
+ LOG.info("Partitions equality " + fsPartitions.equals(metadataPartitions));
Review comment:
Can you clean up the info logging. The draft I gave you had lot of
additional logging for debugging purpose. L286 to L297 should suffice. may be
some stats like total partition count, total file size count per partition we
can log here.
##########
File path: hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
##########
@@ -62,39 +63,59 @@ public static SparkLauncher initLauncher(String
propertiesFile) throws URISyntax
return sparkLauncher;
}
- public static JavaSparkContext initJavaSparkConf(String name) {
- return initJavaSparkConf(name, Option.empty(), Option.empty());
- }
-
- public static JavaSparkContext initJavaSparkConf(String name, Option<String>
master,
- Option<String> executorMemory) {
- SparkConf sparkConf = new SparkConf().setAppName(name);
+ /**
+ * Get the default spark configuration.
+ *
+ * @param appName - Spark application name
+ * @param sparkMaster - Spark master node name
+ * @return Spark configuration
+ */
+ public static SparkConf getDefaultConf(final String appName, final
Option<String> sparkMaster) {
+ final Properties properties = System.getProperties();
+ SparkConf sparkConf = new SparkConf().setAppName(appName);
- String defMaster =
master.orElse(sparkConf.getenv(HoodieCliSparkConfig.CLI_SPARK_MASTER));
- if ((null == defMaster) || (defMaster.isEmpty())) {
- sparkConf.setMaster(DEFAULT_SPARK_MASTER);
- } else {
- sparkConf.setMaster(defMaster);
+ // Configure the sparkMaster
+ String sparkMasterNode = DEFAULT_SPARK_MASTER;
+ if (properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER) != null)
{
+ sparkMasterNode =
properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER);
}
+ sparkMasterNode = sparkMaster.orElse(sparkMasterNode);
+ sparkConf.setMaster(sparkMasterNode);
- sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER,
"org.apache.spark.serializer.KryoSerializer");
+ // Configure driver
sparkConf.set(HoodieCliSparkConfig.CLI_DRIVER_MAX_RESULT_SIZE, "2g");
sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_OVERWRITE, "true");
- sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "true");
- if (executorMemory.isPresent()) {
- sparkConf.set(HoodieCliSparkConfig.CLI_EXECUTOR_MEMORY,
executorMemory.get());
- }
+ sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "false");
+ sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER,
"org.apache.spark.serializer.KryoSerializer");
// Configure hadoop conf
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESS, "true");
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC,
"true");
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC,
"org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_TYPE,
"BLOCK");
+ return sparkConf;
+ }
+
+ public static JavaSparkContext initJavaSparkConf(String name) {
+ return initJavaSparkConf(name, Option.empty(), Option.empty());
+ }
+
+ public static JavaSparkContext initJavaSparkConf(String name, Option<String>
master, Option<String> executorMemory) {
Review comment:
where exactly the 3rd arg is set to non empty? I mean, which caller.
##########
File path:
hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
##########
@@ -152,72 +163,192 @@ public String stats() throws IOException {
config, HoodieCLI.basePath, "/tmp");
Map<String, String> stats = metadata.stats();
- StringBuffer out = new StringBuffer("\n");
- out.append(String.format("Base path: %s\n",
getMetadataTableBasePath(HoodieCLI.basePath)));
+ final List<Comparable[]> rows = new ArrayList<>();
for (Map.Entry<String, String> entry : stats.entrySet()) {
- out.append(String.format("%s: %s\n", entry.getKey(), entry.getValue()));
+ Comparable[] row = new Comparable[2];
+ row[0] = entry.getKey();
+ row[1] = entry.getValue();
+ rows.add(row);
}
- return out.toString();
+ TableHeader header = new TableHeader()
+ .addTableHeaderField("stat key")
+ .addTableHeaderField("stat value");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "",
+ false, Integer.MAX_VALUE, false, rows);
}
- @CliCommand(value = "metadata list-partitions", help = "Print a list of all
partitions from the metadata")
+ @CliCommand(value = "metadata list-partitions", help = "List all partitions
from metadata")
public String listPartitions() throws IOException {
HoodieCLI.getTableMetaClient();
initJavaSparkContext();
HoodieMetadataConfig config =
HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new
HoodieSparkEngineContext(jsc), config,
HoodieCLI.basePath, "/tmp");
- StringBuffer out = new StringBuffer("\n");
if (!metadata.enabled()) {
- out.append("=== Metadata Table not initilized. Using file listing to get
list of partitions. ===\n\n");
+ return "[ERROR] Metadata Table not enabled/initialized\n\n";
}
- long t1 = System.currentTimeMillis();
+ HoodieTimer timer = new HoodieTimer().startTimer();
List<String> partitions = metadata.getAllPartitionPaths();
- long t2 = System.currentTimeMillis();
-
- int[] count = {0};
- partitions.stream().sorted((p1, p2) -> p2.compareTo(p1)).forEach(p -> {
- out.append(p);
- if (++count[0] % 15 == 0) {
- out.append("\n");
- } else {
- out.append(", ");
- }
+ LOG.debug("Took " + timer.endTimer() + " ms");
+
+ final List<Comparable[]> rows = new ArrayList<>();
+ partitions.stream().sorted(Comparator.reverseOrder()).forEach(p -> {
+ Comparable[] row = new Comparable[1];
+ row[0] = p;
+ rows.add(row);
+ LOG.debug(">> partition " + p);
Review comment:
do we need this debug? anyways, we are printing the value of "p" using
HoodiePrintHelper in my understanding.
##########
File path:
hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
##########
@@ -152,72 +163,192 @@ public String stats() throws IOException {
config, HoodieCLI.basePath, "/tmp");
Map<String, String> stats = metadata.stats();
- StringBuffer out = new StringBuffer("\n");
- out.append(String.format("Base path: %s\n",
getMetadataTableBasePath(HoodieCLI.basePath)));
+ final List<Comparable[]> rows = new ArrayList<>();
for (Map.Entry<String, String> entry : stats.entrySet()) {
- out.append(String.format("%s: %s\n", entry.getKey(), entry.getValue()));
+ Comparable[] row = new Comparable[2];
+ row[0] = entry.getKey();
+ row[1] = entry.getValue();
+ rows.add(row);
}
- return out.toString();
+ TableHeader header = new TableHeader()
+ .addTableHeaderField("stat key")
+ .addTableHeaderField("stat value");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "",
+ false, Integer.MAX_VALUE, false, rows);
}
- @CliCommand(value = "metadata list-partitions", help = "Print a list of all
partitions from the metadata")
+ @CliCommand(value = "metadata list-partitions", help = "List all partitions
from metadata")
public String listPartitions() throws IOException {
HoodieCLI.getTableMetaClient();
initJavaSparkContext();
HoodieMetadataConfig config =
HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new
HoodieSparkEngineContext(jsc), config,
HoodieCLI.basePath, "/tmp");
- StringBuffer out = new StringBuffer("\n");
if (!metadata.enabled()) {
- out.append("=== Metadata Table not initilized. Using file listing to get
list of partitions. ===\n\n");
+ return "[ERROR] Metadata Table not enabled/initialized\n\n";
}
- long t1 = System.currentTimeMillis();
+ HoodieTimer timer = new HoodieTimer().startTimer();
List<String> partitions = metadata.getAllPartitionPaths();
- long t2 = System.currentTimeMillis();
-
- int[] count = {0};
- partitions.stream().sorted((p1, p2) -> p2.compareTo(p1)).forEach(p -> {
- out.append(p);
- if (++count[0] % 15 == 0) {
- out.append("\n");
- } else {
- out.append(", ");
- }
+ LOG.debug("Took " + timer.endTimer() + " ms");
+
+ final List<Comparable[]> rows = new ArrayList<>();
+ partitions.stream().sorted(Comparator.reverseOrder()).forEach(p -> {
+ Comparable[] row = new Comparable[1];
+ row[0] = p;
+ rows.add(row);
+ LOG.debug(">> partition " + p);
});
- out.append(String.format("\n\n=== List of partitions retrieved in %.2fsec
===", (t2 - t1) / 1000.0));
-
- return out.toString();
+ TableHeader header = new TableHeader().addTableHeaderField("partition");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "",
+ false, Integer.MAX_VALUE, false, rows);
}
@CliCommand(value = "metadata list-files", help = "Print a list of all files
in a partition from the metadata")
public String listFiles(
- @CliOption(key = {"partition"}, help = "Name of the partition to list
files", mandatory = true)
- final String partition) throws IOException {
+ @CliOption(key = {"partition"}, help = "Name of the partition to list
files", mandatory = true) final String partition) throws IOException {
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config =
HoodieMetadataConfig.newBuilder().enable(true).build();
- HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(new
HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
+ HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(
+ new HoodieLocalEngineContext(HoodieCLI.conf), config,
HoodieCLI.basePath, "/tmp");
- StringBuffer out = new StringBuffer("\n");
if (!metaReader.enabled()) {
- out.append("=== Metadata Table not initialized. Using file listing to
get list of files in partition. ===\n\n");
+ return "[ERROR] Metadata Table not enabled/initialized\n\n";
}
- long t1 = System.currentTimeMillis();
+ HoodieTimer timer = new HoodieTimer().startTimer();
FileStatus[] statuses = metaReader.getAllFilesInPartition(new
Path(HoodieCLI.basePath, partition));
- long t2 = System.currentTimeMillis();
+ LOG.debug("Took " + timer.endTimer() + " ms");
- Arrays.stream(statuses).sorted((p1, p2) ->
p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(p -> {
- out.append("\t" + p.getPath().getName());
- out.append("\n");
+ final List<Comparable[]> rows = new ArrayList<>();
+ Arrays.stream(statuses).sorted((p1, p2) ->
p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(f -> {
+ Comparable[] row = new Comparable[1];
+ row[0] = f;
+ rows.add(row);
});
- out.append(String.format("\n=== Files in partition retrieved in %.2fsec
===", (t2 - t1) / 1000.0));
+ TableHeader header = new TableHeader().addTableHeaderField("file path");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "",
+ false, Integer.MAX_VALUE, false, rows);
+ }
+
+ @CliCommand(value = "metadata validate-files", help = "Validate all files in
all partitions from the metadata")
+ public String validateFiles(
+ @CliOption(key = {"verbose"}, help = "Print all file details",
unspecifiedDefaultValue = "false") final boolean verbose) throws IOException {
+ HoodieCLI.getTableMetaClient();
+ HoodieMetadataConfig config =
HoodieMetadataConfig.newBuilder().enable(true).build();
+ HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(
+ new HoodieLocalEngineContext(HoodieCLI.conf), config,
HoodieCLI.basePath, "/tmp");
+
+ if (!metaReader.enabled()) {
+ return "[ERROR] Metadata Table not enabled/initialized\n\n";
+ }
- return out.toString();
+ HoodieMetadataConfig fsConfig =
HoodieMetadataConfig.newBuilder().enable(false).build();
+ HoodieBackedTableMetadata fsMetaReader = new HoodieBackedTableMetadata(
+ new HoodieLocalEngineContext(HoodieCLI.conf), fsConfig,
HoodieCLI.basePath, "/tmp");
+
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ List<String> metadataPartitions = metaReader.getAllPartitionPaths();
+ List<String> fsPartitions = fsMetaReader.getAllPartitionPaths();
+ Collections.sort(fsPartitions);
+ Collections.sort(metadataPartitions);
+
+ Set<String> allPartitions = new HashSet<>();
+ allPartitions.addAll(fsPartitions);
+ allPartitions.addAll(metadataPartitions);
+
+ LOG.info("All FS partitions count " + fsPartitions.size() + ", metadata
partition count " + metadataPartitions.size());
+ LOG.info("Partitions equality " + fsPartitions.equals(metadataPartitions));
+
+ if (!fsPartitions.equals(metadataPartitions)) {
+ LOG.error("All FS partitions " +
Arrays.toString(fsPartitions.toArray()));
+ LOG.error("All Metadata partitions " +
Arrays.toString(metadataPartitions.toArray()));
+ }
+
+ final List<Comparable[]> rows = new ArrayList<>();
+ for (String partition : allPartitions) {
+ Map<String, FileStatus> fileStatusMap = new HashMap<>();
+ Map<String, FileStatus> metaFileStatusMap = new HashMap<>();
+ FileStatus[] metadataStatuses = metaReader.getAllFilesInPartition(new
Path(HoodieCLI.basePath, partition));
+ Arrays.stream(metadataStatuses).forEach(entry ->
metaFileStatusMap.put(entry.getPath().getName(), entry));
+ FileStatus[] fsStatus = fsMetaReader.getAllFilesInPartition(new
Path(HoodieCLI.basePath, partition));
+ Arrays.stream(fsStatus).forEach(entry ->
fileStatusMap.put(entry.getPath().getName(), entry));
+
+ Set<String> allFiles = new HashSet<>();
+ allFiles.addAll(fileStatusMap.keySet());
+ allFiles.addAll(metaFileStatusMap.keySet());
+
+ for (String file : allFiles) {
+ Comparable[] row = new Comparable[6];
+ row[0] = partition;
+ FileStatus fsFileStatus = fileStatusMap.get(file);
+ FileStatus metaFileStatus = metaFileStatusMap.get(file);
+ row[1] = file;
+ row[2] = fsFileStatus != null;
+ row[3] = metaFileStatus != null;
+ row[4] = (fsFileStatus != null) ? fsFileStatus.getLen() : 0;
+ row[5] = (metaFileStatus != null) ? metaFileStatus.getLen() : 0;
+ if (verbose) {
+ rows.add(row);
+ }
+ }
+
+ LOG.info("Validating partition " + partition);
+ LOG.info(" total FS Files count " + metadataStatuses.length + ",
metadata files count " + fsStatus.length);
+ if (metadataStatuses.length != fsStatus.length) {
+ LOG.error(" Files equality size failed ");
+ }
+
+ for (Map.Entry<String, FileStatus> entry : fileStatusMap.entrySet()) {
Review comment:
I was referring to these code blocks. Please check if this gives us any
additional benefit compared to what we print using HoodiePrintHelper
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]