This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/rfc-15 by this push:
new 7d31f0a [RFC-15] Adding interfaces for HoodieMetadata,
HoodieMetadataWriter (#2266)
7d31f0a is described below
commit 7d31f0ab1aff1eed56fcb5c473c32fc7ed83847e
Author: vinoth chandar <[email protected]>
AuthorDate: Tue Dec 1 08:38:07 2020 -0800
[RFC-15] Adding interfaces for HoodieMetadata, HoodieMetadataWriter (#2266)
- moved MetadataReader to HoodieBackedTableMetadata, under the
HoodieTableMetadata interface
- moved MetadataWriter to HoodieBackedTableMetadataWriter, under the
HoodieTableMetadataWriter
- Pulled all the metrics into HoodieMetadataMetrics
- Writer now wraps the metadata, instead of extending it
- New enum for MetadataPartitionType
- Streamlined code flow inside HoodieBackedTableMetadataWriter w.r.t
initializing metadata state
---
.../apache/hudi/cli/commands/MetadataCommand.java | 64 +++-
.../hudi/client/AbstractHoodieWriteClient.java | 2 +-
.../org/apache/hudi/client/HoodieWriteClient.java | 6 +-
...r.java => HoodieBackedTableMetadataWriter.java} | 426 ++++++++++-----------
.../metadata/HoodieMetadataFileSystemView.java | 5 +-
.../hudi/metadata/HoodieTableMetadataWriter.java | 51 +++
.../java/org/apache/hudi/table/HoodieTable.java | 20 +-
.../hudi/table/HoodieTimelineArchiveLog.java | 3 +-
.../table/action/clean/CleanActionExecutor.java | 2 +-
.../hudi/table/action/clean/CleanPlanner.java | 3 +-
.../action/commit/BaseCommitActionExecutor.java | 2 +-
.../action/restore/BaseRestoreActionExecutor.java | 2 +-
.../rollback/BaseRollbackActionExecutor.java | 2 +-
...odieMetadata.java => TestHoodieFsMetadata.java} | 63 +--
...aReader.java => HoodieBackedTableMetadata.java} | 312 ++++-----------
.../HoodieMetadataMergedLogRecordScanner.java | 7 +-
.../hudi/metadata/HoodieMetadataMetrics.java | 148 +++++++
.../hudi/metadata/HoodieMetadataPayload.java | 7 +-
.../apache/hudi/metadata/HoodieTableMetadata.java | 86 +++++
.../hudi/metadata/MetadataPartitionType.java | 33 ++
20 files changed, 724 insertions(+), 520 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
index a45e9b4..2eb9988 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
@@ -22,12 +22,14 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.utils.SparkUtil;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieMetadataConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.metadata.HoodieMetadataReader;
-import org.apache.hudi.metadata.HoodieMetadataWriter;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+
import org.apache.spark.api.java.JavaSparkContext;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
@@ -45,23 +47,45 @@ import java.util.Map;
*/
@Component
public class MetadataCommand implements CommandMarker {
+
private JavaSparkContext jsc;
+ private static String metadataBaseDirectory;
+
+ /**
+ * Sets the directory to store/read Metadata Table.
+ *
+ * This can be used to store the metadata table away from the dataset
directory.
+ * - Useful for testing as well as for using via the HUDI CLI so that the
actual dataset is not written to.
+ * - Useful for testing Metadata Table performance and operations on
existing datasets before enabling.
+ */
+ public static void setMetadataBaseDirectory(String metadataDir) {
+ ValidationUtils.checkState(metadataBaseDirectory == null,
+ "metadataBaseDirectory is already set to " + metadataBaseDirectory);
+ metadataBaseDirectory = metadataDir;
+ }
+
+ public static String getMetadataTableBasePath(String tableBasePath) {
+ if (metadataBaseDirectory != null) {
+ return metadataBaseDirectory;
+ }
+ return HoodieTableMetadata.getMetadataTableBasePath(tableBasePath);
+ }
@CliCommand(value = "metadata set", help = "Set options for Metadata Table")
public String set(@CliOption(key = {"metadataDir"},
help = "Directory to read/write metadata table (can be different from
dataset)", unspecifiedDefaultValue = "")
final String metadataDir) {
if (!metadataDir.isEmpty()) {
- HoodieMetadataReader.setMetadataBaseDirectory(metadataDir);
+ setMetadataBaseDirectory(metadataDir);
}
- return String.format("Ok");
+ return "Ok";
}
@CliCommand(value = "metadata create", help = "Create the Metadata Table if
it does not exist")
public String create() throws IOException {
HoodieCLI.getTableMetaClient();
- Path metadataPath = new
Path(HoodieMetadataReader.getMetadataTableBasePath(HoodieCLI.basePath));
+ Path metadataPath = new Path(getMetadataTableBasePath(HoodieCLI.basePath));
try {
FileStatus[] statuses = HoodieCLI.fs.listStatus(metadataPath);
if (statuses.length > 0) {
@@ -75,15 +99,14 @@ public class MetadataCommand implements CommandMarker {
HoodieTimer timer = new HoodieTimer().startTimer();
HoodieWriteConfig writeConfig = getWriteConfig();
initJavaSparkContext();
- HoodieMetadataWriter.create(HoodieCLI.conf, writeConfig).initialize(jsc);
+ HoodieTableMetadataWriter.create(HoodieCLI.conf, writeConfig, jsc);
return String.format("Created Metadata Table in %s (duration=%.2f secs)",
metadataPath, timer.endTimer() / 1000.0);
}
@CliCommand(value = "metadata delete", help = "Remove the Metadata Table")
public String delete() throws Exception {
HoodieCLI.getTableMetaClient();
- HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
- Path metadataPath = new
Path(HoodieMetadataReader.getMetadataTableBasePath(HoodieCLI.basePath));
+ Path metadataPath = new Path(getMetadataTableBasePath(HoodieCLI.basePath));
try {
FileStatus[] statuses = HoodieCLI.fs.listStatus(metadataPath);
if (statuses.length > 0) {
@@ -100,7 +123,7 @@ public class MetadataCommand implements CommandMarker {
public String init(@CliOption(key = {"readonly"}, unspecifiedDefaultValue =
"false",
help = "Open in read-only mode") final boolean readOnly) throws
Exception {
HoodieCLI.getTableMetaClient();
- Path metadataPath = new
Path(HoodieMetadataReader.getMetadataTableBasePath(HoodieCLI.basePath));
+ Path metadataPath = new Path(getMetadataTableBasePath(HoodieCLI.basePath));
try {
HoodieCLI.fs.listStatus(metadataPath);
} catch (FileNotFoundException e) {
@@ -114,7 +137,7 @@ public class MetadataCommand implements CommandMarker {
} else {
HoodieWriteConfig writeConfig = getWriteConfig();
initJavaSparkContext();
- HoodieMetadataWriter.create(HoodieCLI.conf, writeConfig).initialize(jsc);
+ HoodieTableMetadataWriter.create(HoodieCLI.conf, writeConfig, jsc);
}
long t2 = System.currentTimeMillis();
@@ -125,11 +148,11 @@ public class MetadataCommand implements CommandMarker {
@CliCommand(value = "metadata stats", help = "Print stats about the
metadata")
public String stats() throws IOException {
HoodieCLI.getTableMetaClient();
- HoodieMetadataReader metaReader = new HoodieMetadataReader(HoodieCLI.conf,
HoodieCLI.basePath, "/tmp", true, false);
- Map<String, String> stats = metaReader.getStats(true);
+ HoodieBackedTableMetadata metadata = new
HoodieBackedTableMetadata(HoodieCLI.conf, HoodieCLI.basePath, "/tmp", true,
false, false);
+ Map<String, String> stats = metadata.stats();
StringBuffer out = new StringBuffer("\n");
- out.append(String.format("Base path: %s\n",
HoodieMetadataReader.getMetadataTableBasePath(HoodieCLI.basePath)));
+ out.append(String.format("Base path: %s\n",
getMetadataTableBasePath(HoodieCLI.basePath)));
for (Map.Entry<String, String> entry : stats.entrySet()) {
out.append(String.format("%s: %s\n", entry.getKey(), entry.getValue()));
}
@@ -140,15 +163,15 @@ public class MetadataCommand implements CommandMarker {
@CliCommand(value = "metadata list-partitions", help = "Print a list of all
partitions from the metadata")
public String listPartitions() throws IOException {
HoodieCLI.getTableMetaClient();
- HoodieMetadataReader metaReader = new HoodieMetadataReader(HoodieCLI.conf,
HoodieCLI.basePath, "/tmp", true, false);
+ HoodieBackedTableMetadata metadata = new
HoodieBackedTableMetadata(HoodieCLI.conf, HoodieCLI.basePath, "/tmp", true,
false, false);
StringBuffer out = new StringBuffer("\n");
- if (!metaReader.enabled()) {
+ if (!metadata.enabled()) {
out.append("=== Metadata Table not initilized. Using file listing to get
list of partitions. ===\n\n");
}
long t1 = System.currentTimeMillis();
- List<String> partitions = metaReader.getAllPartitionPaths(HoodieCLI.fs,
HoodieCLI.basePath, false);
+ List<String> partitions = metadata.getAllPartitionPaths();
long t2 = System.currentTimeMillis();
int[] count = {0};
@@ -171,16 +194,15 @@ public class MetadataCommand implements CommandMarker {
@CliOption(key = {"partition"}, help = "Name of the partition to list
files", mandatory = true)
final String partition) throws IOException {
HoodieCLI.getTableMetaClient();
- HoodieMetadataReader metaReader = new HoodieMetadataReader(HoodieCLI.conf,
HoodieCLI.basePath, "/tmp", true, false);
+ HoodieBackedTableMetadata metaReader = new
HoodieBackedTableMetadata(HoodieCLI.conf, HoodieCLI.basePath, "/tmp", true,
false, false);
StringBuffer out = new StringBuffer("\n");
if (!metaReader.enabled()) {
- out.append("=== Metadata Table not initilized. Using file listing to get
list of files in partition. ===\n\n");
+ out.append("=== Metadata Table not initialized. Using file listing to
get list of files in partition. ===\n\n");
}
long t1 = System.currentTimeMillis();
- FileStatus[] statuses = metaReader.getAllFilesInPartition(HoodieCLI.conf,
HoodieCLI.basePath,
- new Path(HoodieCLI.basePath, partition));
+ FileStatus[] statuses = metaReader.getAllFilesInPartition(new
Path(HoodieCLI.basePath, partition));
long t2 = System.currentTimeMillis();
Arrays.stream(statuses).sorted((p1, p2) ->
p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(p -> {
diff --git
a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index a4015a6..eff299b 100644
---
a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++
b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -128,7 +128,7 @@ public abstract class AbstractHoodieWriteClient<T extends
HoodieRecordPayload> e
finalizeWrite(table, instantTime, stats);
try {
- table.metadata().update(jsc, metadata, instantTime);
+ table.metadataWriter(jsc).update(metadata, instantTime);
activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType,
instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
diff --git
a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index c180a88..c6310ac 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -44,7 +44,7 @@ import org.apache.hudi.exception.HoodieRestoreException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.metadata.HoodieMetadataWriter;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
@@ -124,7 +124,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
this.rollbackPending = rollbackPending;
// Initialize Metadata Table
- HoodieMetadataWriter.create(hadoopConf, writeConfig).initialize(jsc);
+ HoodieTableMetadataWriter.create(hadoopConf, writeConfig, jsc);
}
/**
@@ -701,7 +701,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
finalizeWrite(table, compactionCommitTime, writeStats);
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished
with result " + metadata);
- table.metadata().update(jsc, metadata, compactionCommitTime);
+ table.metadataWriter(jsc).update(metadata, compactionCommitTime);
CompactHelpers.completeInflightCompaction(table, compactionCommitTime,
metadata);
diff --git
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
similarity index 68%
rename from
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
rename to
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 08a785d..992c240 100644
---
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
+++
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -18,22 +18,6 @@
package org.apache.hudi.metadata;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
@@ -41,10 +25,11 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.ClientUtils;
+import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -63,6 +48,7 @@ import
org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -74,74 +60,100 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
import scala.Tuple2;
+import static
org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
+import static
org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
+import static
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
+
/**
- * Writer for Metadata Table.
- *
- * Partition and file listing are saved within an internal MOR table called
Metadata Table. This table is created
- * by listing files and partitions (first time) and kept in sync using the
instants on the main dataset.
+ * Writer implementation backed by an internal hudi table. Partition and file
listing are saved within an internal MOR table
+ * called Metadata Table. This table is created by listing files and
partitions (first time)
+ * and kept in sync using the instants on the main dataset.
*/
-public class HoodieMetadataWriter extends HoodieMetadataReader implements
Serializable {
- private static final Logger LOG =
LogManager.getLogger(HoodieMetadataWriter.class);
+public class HoodieBackedTableMetadataWriter implements
HoodieTableMetadataWriter {
- // Metric names
- public static final String INITIALIZE_STR = "initialize";
- public static final String SYNC_STR = "sync";
+ private static final Logger LOG =
LogManager.getLogger(HoodieBackedTableMetadataWriter.class);
- private HoodieWriteConfig config;
+ private HoodieWriteConfig metadataWriteConfig;
+ private HoodieWriteConfig datasetWriteConfig;
private String tableName;
- private static Map<String, HoodieMetadataWriter> instances = new HashMap<>();
-
- public static HoodieMetadataWriter create(Configuration conf,
HoodieWriteConfig writeConfig) {
- String key = writeConfig.getBasePath();
- if (instances.containsKey(key)) {
- if (instances.get(key).enabled() !=
writeConfig.useFileListingMetadata()) {
- // Enabled state has changed. Remove so it is recreated.
- instances.remove(key);
- }
- }
- return instances.computeIfAbsent(key, k -> {
- try {
- return new HoodieMetadataWriter(conf, writeConfig);
- } catch (IOException e) {
- throw new HoodieMetadataException("Could not initialize
HoodieMetadataWriter", e);
- }
- });
- }
+ private HoodieBackedTableMetadata metadata;
+ private HoodieTableMetaClient metaClient;
+ private Option<HoodieMetadataMetrics> metrics;
+ private boolean enabled;
+ private SerializableConfiguration hadoopConf;
+ private final transient JavaSparkContext jsc;
- HoodieMetadataWriter(Configuration hadoopConf, HoodieWriteConfig
writeConfig) throws IOException {
- super(hadoopConf, writeConfig.getBasePath(),
writeConfig.getSpillableMapBasePath(),
- writeConfig.useFileListingMetadata(),
writeConfig.getFileListingMetadataVerify(), false);
+ HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig
writeConfig, JavaSparkContext jsc) {
+ this.datasetWriteConfig = writeConfig;
+ this.jsc = jsc;
+ this.hadoopConf = new SerializableConfiguration(hadoopConf);
if (writeConfig.useFileListingMetadata()) {
this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
- this.config = createMetadataWriteConfig(writeConfig);
+ this.metadataWriteConfig = createMetadataWriteConfig(writeConfig);
enabled = true;
// Inline compaction and auto clean is required as we dont expose this
table outside
- ValidationUtils.checkArgument(this.config.isAutoClean(), "Auto clean is
required for Metadata Compaction config");
- ValidationUtils.checkArgument(this.config.isInlineCompaction(), "Inline
compaction is required for Metadata Compaction config");
- // Metadata Table cannot have its metadata optimized
- ValidationUtils.checkArgument(this.config.shouldAutoCommit(), "Auto
commit is required for Metadata Table");
- ValidationUtils.checkArgument(!this.config.useFileListingMetadata(),
"File listing cannot be used for Metadata Table");
-
- if (config.isMetricsOn()) {
- if (config.isExecutorMetricsEnabled()) {
- metricsRegistry = Registry.getRegistry("HoodieMetadata",
DistributedRegistry.class.getName());
+ ValidationUtils.checkArgument(this.metadataWriteConfig.isAutoClean(),
"Auto clean is required for Metadata Compaction config");
+
ValidationUtils.checkArgument(this.metadataWriteConfig.isInlineCompaction(),
"Inline compaction is required for Metadata Compaction config");
+ // Metadata Table cannot have metadata listing turned on. (infinite
loop, much?)
+
ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(),
"Auto commit is required for Metadata Table");
+
ValidationUtils.checkArgument(!this.metadataWriteConfig.useFileListingMetadata(),
"File listing cannot be used for Metadata Table");
+
+ if (metadataWriteConfig.isMetricsOn()) {
+ Registry registry;
+ if (metadataWriteConfig.isExecutorMetricsEnabled()) {
+ registry = Registry.getRegistry("HoodieMetadata",
DistributedRegistry.class.getName());
} else {
- metricsRegistry = Registry.getRegistry("HoodieMetadata");
+ registry = Registry.getRegistry("HoodieMetadata");
}
+ this.metrics = Option.of(new HoodieMetadataMetrics(registry));
+ } else {
+ this.metrics = Option.empty();
+ }
+
+ HoodieTableMetaClient datasetMetaClient = new
HoodieTableMetaClient(hadoopConf, datasetWriteConfig.getBasePath());
+ initialize(jsc, datasetMetaClient);
+ if (enabled) {
+ // (re) init the metadata for reading.
+ initTableMetadata();
+
+ // This is always called even in case the table was created for the
first time. This is because
+ // initFromFilesystem() does file listing and hence may take a long
time during which some new updates
+ // may have occurred on the table. Hence, calling this always ensures
that the metadata is brought in sync
+ // with the active timeline.
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ syncFromInstants(jsc, datasetMetaClient);
+ metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.SYNC_STR,
timer.endTimer()));
}
} else {
enabled = false;
+ this.metrics = Option.empty();
}
}
@@ -149,9 +161,8 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
* Create a {@code HoodieWriteConfig} to use for the Metadata Table.
*
* @param writeConfig {@code HoodieWriteConfig} of the main dataset writer
- * @param schemaStr Metadata Table schema
*/
- private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig
writeConfig) throws IOException {
+ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig
writeConfig) {
int parallelism = writeConfig.getMetadataInsertParallelism();
// Create the write config for the metadata table by borrowing options
from the main write config.
@@ -168,7 +179,7 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
.withAvroSchemaValidate(true)
.withEmbeddedTimelineServerEnabled(false)
.withAssumeDatePartitioning(false)
- .withPath(metadataBasePath)
+
.withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
.withSchema(HoodieMetadataRecord.getClassSchema().toString())
.forTable(tableName)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -217,16 +228,11 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
}
public HoodieWriteConfig getWriteConfig() {
- return config;
+ return metadataWriteConfig;
}
- /**
- * Reload the metadata table by syncing it based on the commits on the
dataset.
- */
- public void reload(JavaSparkContext jsc) throws IOException {
- if (enabled) {
- initialize(jsc);
- }
+ public HoodieTableMetadata metadata() {
+ return metadata;
}
/**
@@ -240,57 +246,37 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
* which are large in size (AVRO or JSON encoded and not compressed) and
incur considerable IO for de-serialization
* and decoding.
*/
- public void initialize(JavaSparkContext jsc) {
+ private void initialize(JavaSparkContext jsc, HoodieTableMetaClient
datasetMetaClient) {
try {
- if (metricsRegistry instanceof DistributedRegistry) {
- ((DistributedRegistry) metricsRegistry).register(jsc);
- }
+ metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> {
+ if (registry instanceof DistributedRegistry) {
+ ((DistributedRegistry) registry).register(jsc);
+ }
+ });
if (enabled) {
- initializeAndSync(jsc);
+ bootstrapIfNeeded(jsc, datasetMetaClient);
}
} catch (IOException e) {
- LOG.error("Failed to initialize metadata table. Metdata will be
disabled.", e);
+ LOG.error("Failed to initialize metadata table. Disabling the writer.",
e);
enabled = false;
}
}
- private void initializeAndSync(JavaSparkContext jsc) throws IOException {
- long t1 = System.currentTimeMillis();
-
- HoodieTableMetaClient datasetMetaClient = new
HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
- FileSystem fs = FSUtils.getFs(metadataBasePath, hadoopConf.get());
- boolean exists = fs.exists(new Path(metadataBasePath,
HoodieTableMetaClient.METAFOLDER_NAME));
+ private void initTableMetadata() {
+ this.metadata = new HoodieBackedTableMetadata(hadoopConf.get(),
datasetWriteConfig.getBasePath(), datasetWriteConfig.getSpillableMapBasePath(),
+ datasetWriteConfig.useFileListingMetadata(),
datasetWriteConfig.getFileListingMetadataVerify(), false,
+ datasetWriteConfig.shouldAssumeDatePartitioning());
+ this.metaClient = metadata.getMetaClient();
+ }
+ private void bootstrapIfNeeded(JavaSparkContext jsc, HoodieTableMetaClient
datasetMetaClient) throws IOException {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ boolean exists = datasetMetaClient.getFs().exists(new
Path(metadataWriteConfig.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME));
if (!exists) {
// Initialize for the first time by listing partitions and files
directly from the file system
- initFromFilesystem(jsc, datasetMetaClient);
- } else {
- metaClient = ClientUtils.createMetaClient(hadoopConf.get(), config,
true);
- }
-
- /*
- // TODO: We may not be able to sync in certain cases (instants archived
etc)
- //if (!canSync(datasetMetaClient)) {
- // Need to recreate the table as sync has failed
- // TODO: delete the table
- // initFromFilesystem(datasetMetaClient);
- //}
- */
-
- // This is always called even in case the table was created for the first
time. This is because
- // initFromFilesystem() does file listing and hence may take a long time
during which some new updates
- // may have occurred on the table. Hence, calling this always ensures that
the metadata is brought in sync
- // with the active timeline.
- syncFromInstants(jsc, datasetMetaClient);
-
- // Publish some metrics
- long durationInMs = System.currentTimeMillis() - t1;
- // Time to initilize and sync
- if (exists) {
- updateMetrics(SYNC_STR, durationInMs);
- } else {
- updateMetrics(INITIALIZE_STR, durationInMs);
+ bootstrapFromFilesystem(jsc, datasetMetaClient);
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
}
}
@@ -299,7 +285,7 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
*
* @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
*/
- private void initFromFilesystem(JavaSparkContext jsc, HoodieTableMetaClient
datasetMetaClient) throws IOException {
+ private void bootstrapFromFilesystem(JavaSparkContext jsc,
HoodieTableMetaClient datasetMetaClient) throws IOException {
ValidationUtils.checkState(enabled, "Metadata table cannot be initialized
as it is not enabled");
// If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP
as the instant time for initial commit
@@ -313,24 +299,26 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
latestInstant = Option.of(instant);
}
}
- String createInstantTime = latestInstant.isPresent() ?
latestInstant.get().getTimestamp() : SOLO_COMMIT_TIMESTAMP;
- LOG.info("Creating a new metadata table in " + metadataBasePath + " at
instant " + createInstantTime);
- metaClient = HoodieTableMetaClient.initTableType(hadoopConf.get(),
metadataBasePath.toString(),
+
+ String createInstantTime =
latestInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+ LOG.info("Creating a new metadata table in " +
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
+
+ HoodieTableMetaClient.initTableType(hadoopConf.get(),
metadataWriteConfig.getBasePath(),
HoodieTableType.MERGE_ON_READ, tableName, "archived",
HoodieMetadataPayload.class.getName(),
HoodieFileFormat.HFILE.toString());
+ initTableMetadata();
// List all partitions in the basePath of the containing dataset
FileSystem fs = datasetMetaClient.getFs();
- List<String> partitions = FSUtils.getAllPartitionPaths(fs,
datasetBasePath, false);
+ List<String> partitions = FSUtils.getAllPartitionPaths(fs,
datasetWriteConfig.getBasePath(),
datasetWriteConfig.shouldAssumeDatePartitioning());
LOG.info("Initializing metadata table by using file listings in " +
partitions.size() + " partitions");
// List all partitions in parallel and collect the files in them
- final String dbasePath = datasetBasePath;
int parallelism = Math.min(partitions.size(), jsc.defaultParallelism()) +
1; // +1 to prevent 0 parallelism
JavaPairRDD<String, FileStatus[]> partitionFileListRDD =
jsc.parallelize(partitions, parallelism)
.mapToPair(partition -> {
FileSystem fsys = datasetMetaClient.getFs();
- FileStatus[] statuses = FSUtils.getAllDataFilesInPartition(fsys, new
Path(dbasePath, partition));
+ FileStatus[] statuses = FSUtils.getAllDataFilesInPartition(fsys, new
Path(datasetWriteConfig.getBasePath(), partition));
return new Tuple2<>(partition, statuses);
});
@@ -339,12 +327,12 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
// Create a HoodieCommitMetadata with writeStats for all discovered files
int[] stats = {0};
- HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
partitionFileList.forEach(t -> {
final String partition = t._1;
try {
- if (!fs.exists(new Path(datasetBasePath, partition + Path.SEPARATOR +
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))) {
+ if (!fs.exists(new Path(datasetWriteConfig.getBasePath(), partition +
Path.SEPARATOR + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))) {
return;
}
} catch (IOException e) {
@@ -367,20 +355,20 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
writeStat.setPath(partition + Path.SEPARATOR +
status.getPath().getName());
writeStat.setPartitionPath(partition);
writeStat.setTotalWriteBytes(status.getLen());
- metadata.addWriteStat(partition, writeStat);
+ commitMetadata.addWriteStat(partition, writeStat);
stats[0] += 1;
});
// If the partition has no files then create a writeStat with no file
path
- if (metadata.getWriteStats(partition) == null) {
+ if (commitMetadata.getWriteStats(partition) == null) {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setPartitionPath(partition);
- metadata.addWriteStat(partition, writeStat);
+ commitMetadata.addWriteStat(partition, writeStat);
}
});
LOG.info("Committing " + partitionFileList.size() + " partitions and " +
stats[0] + " files to metadata");
- update(jsc, metadata, createInstantTime);
+ update(commitMetadata, createInstantTime);
}
/**
@@ -388,71 +376,77 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
*
* @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
*/
- private void syncFromInstants(JavaSparkContext jsc, HoodieTableMetaClient
datasetMetaClient) throws IOException {
+ private void syncFromInstants(JavaSparkContext jsc, HoodieTableMetaClient
datasetMetaClient) {
ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it
is not enabled");
- List<HoodieInstant> instantsToSync = findInstantsToSync(datasetMetaClient);
- if (instantsToSync.isEmpty()) {
- return;
- }
+ try {
+ List<HoodieInstant> instantsToSync =
metadata.findInstantsToSync(datasetMetaClient);
+ if (instantsToSync.isEmpty()) {
+ return;
+ }
- LOG.info("Syncing " + instantsToSync.size() + " instants to metadata
table: " + instantsToSync);
-
- // Read each instant in order and sync it to metadata table
- final HoodieActiveTimeline timeline =
datasetMetaClient.getActiveTimeline();
- for (HoodieInstant instant : instantsToSync) {
- LOG.info("Syncing instant " + instant + " to metadata table");
-
- switch (instant.getAction()) {
- case HoodieTimeline.CLEAN_ACTION: {
- // CLEAN is synced from the
- // - inflight instant which contains the HoodieCleanerPlan, or
- // - complete instant which contains the HoodieCleanMetadata
- try {
- HoodieInstant inflightCleanInstant = new HoodieInstant(true,
instant.getAction(), instant.getTimestamp());
- ValidationUtils.checkArgument(inflightCleanInstant.isInflight());
- HoodieCleanerPlan cleanerPlan =
CleanerUtils.getCleanerPlan(datasetMetaClient, inflightCleanInstant);
- update(jsc, cleanerPlan, instant.getTimestamp());
- } catch (HoodieIOException e) {
- HoodieInstant cleanInstant = new HoodieInstant(false,
instant.getAction(), instant.getTimestamp());
- ValidationUtils.checkArgument(cleanInstant.isCompleted());
- HoodieCleanMetadata cleanMetadata =
CleanerUtils.getCleanerMetadata(datasetMetaClient, cleanInstant);
- update(jsc, cleanMetadata, instant.getTimestamp());
+ LOG.info("Syncing " + instantsToSync.size() + " instants to metadata
table: " + instantsToSync);
+
+ // Read each instant in order and sync it to metadata table
+ final HoodieActiveTimeline timeline =
datasetMetaClient.getActiveTimeline();
+ for (HoodieInstant instant : instantsToSync) {
+ LOG.info("Syncing instant " + instant + " to metadata table");
+
+ switch (instant.getAction()) {
+ case HoodieTimeline.CLEAN_ACTION: {
+ // CLEAN is synced from the
+ // - inflight instant which contains the HoodieCleanerPlan, or
+ // - complete instant which contains the HoodieCleanMetadata
+ try {
+ HoodieInstant inflightCleanInstant = new HoodieInstant(true,
instant.getAction(), instant.getTimestamp());
+ ValidationUtils.checkArgument(inflightCleanInstant.isInflight());
+ HoodieCleanerPlan cleanerPlan =
CleanerUtils.getCleanerPlan(datasetMetaClient, inflightCleanInstant);
+ update(cleanerPlan, instant.getTimestamp());
+ } catch (HoodieIOException e) {
+ HoodieInstant cleanInstant = new HoodieInstant(false,
instant.getAction(), instant.getTimestamp());
+ ValidationUtils.checkArgument(cleanInstant.isCompleted());
+ HoodieCleanMetadata cleanMetadata =
CleanerUtils.getCleanerMetadata(datasetMetaClient, cleanInstant);
+ update(cleanMetadata, instant.getTimestamp());
+ }
+ break;
+ }
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.COMPACTION_ACTION: {
+ ValidationUtils.checkArgument(instant.isCompleted());
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
+ timeline.getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
+ update(commitMetadata, instant.getTimestamp());
+ break;
+ }
+ case HoodieTimeline.ROLLBACK_ACTION: {
+ ValidationUtils.checkArgument(instant.isCompleted());
+ HoodieRollbackMetadata rollbackMetadata =
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
+ timeline.getInstantDetails(instant).get());
+ update(rollbackMetadata, instant.getTimestamp());
+ break;
+ }
+ case HoodieTimeline.RESTORE_ACTION: {
+ ValidationUtils.checkArgument(instant.isCompleted());
+ HoodieRestoreMetadata restoreMetadata =
TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
+ timeline.getInstantDetails(instant).get());
+ update(restoreMetadata, instant.getTimestamp());
+ break;
+ }
+ case HoodieTimeline.SAVEPOINT_ACTION: {
+ ValidationUtils.checkArgument(instant.isCompleted());
+ // Nothing to be done here
+ break;
+ }
+ default: {
+ throw new HoodieException("Unknown type of action " +
instant.getAction());
}
- break;
- }
- case HoodieTimeline.DELTA_COMMIT_ACTION:
- case HoodieTimeline.COMMIT_ACTION:
- case HoodieTimeline.COMPACTION_ACTION: {
- ValidationUtils.checkArgument(instant.isCompleted());
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
- timeline.getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
- update(jsc, commitMetadata, instant.getTimestamp());
- break;
- }
- case HoodieTimeline.ROLLBACK_ACTION: {
- ValidationUtils.checkArgument(instant.isCompleted());
- HoodieRollbackMetadata rollbackMetadata =
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
- timeline.getInstantDetails(instant).get());
- update(jsc, rollbackMetadata, instant.getTimestamp());
- break;
- }
- case HoodieTimeline.RESTORE_ACTION: {
- ValidationUtils.checkArgument(instant.isCompleted());
- HoodieRestoreMetadata restoreMetadata =
TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
- timeline.getInstantDetails(instant).get());
- update(jsc, restoreMetadata, instant.getTimestamp());
- break;
- }
- case HoodieTimeline.SAVEPOINT_ACTION: {
- ValidationUtils.checkArgument(instant.isCompleted());
- // Nothing to be done here
- break;
- }
- default: {
- throw new HoodieException("Unknown type of action " +
instant.getAction());
}
}
+ // re-init the table metadata, for any future writes.
+ initTableMetadata();
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Unable to sync instants from data to
metadata table.", ioe);
}
}
@@ -462,7 +456,8 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
* @param commitMetadata {@code HoodieCommitMetadata}
* @param instantTime Timestamp at which the commit was performed
*/
- public void update(JavaSparkContext jsc, HoodieCommitMetadata
commitMetadata, String instantTime) {
+ @Override
+ public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
if (!enabled) {
return;
}
@@ -494,12 +489,12 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
});
// New partitions created
- HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new
ArrayList<String>(allPartitions));
+ HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new
ArrayList<>(allPartitions));
records.add(record);
LOG.info("Updating at " + instantTime + " from Commit/" +
commitMetadata.getOperationType()
+ ". #partitions_updated=" + records.size());
- commit(jsc, prepRecords(jsc, records, METADATA_PARTITION_NAME),
instantTime);
+ commit(jsc, prepRecords(jsc, records,
MetadataPartitionType.FILES.partitionPath()), instantTime);
}
/**
@@ -508,7 +503,8 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
* @param cleanerPlan {@code HoodieCleanerPlan}
* @param instantTime Timestamp at which the clean plan was generated
*/
- public void update(JavaSparkContext jsc, HoodieCleanerPlan cleanerPlan,
String instantTime) {
+ @Override
+ public void update(HoodieCleanerPlan cleanerPlan, String instantTime) {
if (!enabled) {
return;
}
@@ -535,7 +531,7 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
LOG.info("Updating at " + instantTime + " from CleanerPlan.
#partitions_updated=" + records.size()
+ ", #files_deleted=" + fileDeleteCount[0]);
- commit(jsc, prepRecords(jsc, records, METADATA_PARTITION_NAME),
instantTime);
+ commit(jsc, prepRecords(jsc, records,
MetadataPartitionType.FILES.partitionPath()), instantTime);
}
/**
@@ -544,7 +540,8 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
* @param cleanMetadata {@code HoodieCleanMetadata}
* @param instantTime Timestamp at which the clean was completed
*/
- public void update(JavaSparkContext jsc, HoodieCleanMetadata cleanMetadata,
String instantTime) {
+ @Override
+ public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
if (!enabled) {
return;
}
@@ -564,7 +561,7 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
LOG.info("Updating at " + instantTime + " from Clean.
#partitions_updated=" + records.size()
+ ", #files_deleted=" + fileDeleteCount[0]);
- commit(jsc, prepRecords(jsc, records, METADATA_PARTITION_NAME),
instantTime);
+ commit(jsc, prepRecords(jsc, records,
MetadataPartitionType.FILES.partitionPath()), instantTime);
}
/**
@@ -573,7 +570,8 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
* @param restoreMetadata {@code HoodieRestoreMetadata}
* @param instantTime Timestamp at which the restore was performed
*/
- public void update(JavaSparkContext jsc, HoodieRestoreMetadata
restoreMetadata, String instantTime) {
+ @Override
+ public void update(HoodieRestoreMetadata restoreMetadata, String
instantTime) {
if (!enabled) {
return;
}
@@ -592,7 +590,8 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
* @param rollbackMetadata {@code HoodieRollbackMetadata}
* @param instantTime Timestamp at which the rollback was performed
*/
- public void update(JavaSparkContext jsc, HoodieRollbackMetadata
rollbackMetadata, String instantTime) {
+ @Override
+ public void update(HoodieRollbackMetadata rollbackMetadata, String
instantTime) {
if (!enabled) {
return;
}
@@ -662,7 +661,7 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
// Rollbacks deletes instants from timeline. The instant being
rolled-back may not have been synced to the
// metadata table. Hence, the deleted filed need to be checked against
the metadata.
try {
- FileStatus[] existingStatuses = getAllFilesInPartition(new
Path(datasetBasePath, partition));
+ FileStatus[] existingStatuses = metadata.fetchAllFilesInPartition(new
Path(metadata.getDatasetBasePath(), partition));
Set<String> currentFiles =
Arrays.stream(existingStatuses).map(s ->
s.getPath().getName()).collect(Collectors.toSet());
@@ -705,7 +704,7 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
LOG.info("Updating at " + instantTime + " from " + operation + ".
#partitions_updated=" + records.size()
+ ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" +
fileChangeCount[1]);
- commit(jsc, prepRecords(jsc, records, METADATA_PARTITION_NAME),
instantTime);
+ commit(jsc, prepRecords(jsc, records,
MetadataPartitionType.FILES.partitionPath()), instantTime);
}
/**
@@ -716,10 +715,9 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
*/
private synchronized void commit(JavaSparkContext jsc, JavaRDD<HoodieRecord>
recordRDD, String instantTime) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to
as it is not enabled");
+ metadata.closeReaders();
- closeReaders();
-
- try (HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config,
true)) {
+ try (HoodieWriteClient writeClient = new HoodieWriteClient(jsc,
metadataWriteConfig, true)) {
writeClient.startCommitWithTime(instantTime);
List<WriteStatus> statuses = writeClient.upsertPreppedRecords(recordRDD,
instantTime).collect();
statuses.forEach(writeStatus -> {
@@ -730,15 +728,17 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
}
// Update total size of the metadata and count of base/log files
- Map<String, String> stats;
- try {
- stats = getStats(false);
- updateMetrics(Long.valueOf(stats.get(STAT_TOTAL_BASE_FILE_SIZE)),
- Long.valueOf(stats.get(STAT_TOTAL_LOG_FILE_SIZE)),
Integer.valueOf(stats.get(STAT_COUNT_BASE_FILES)),
- Integer.valueOf(stats.get(STAT_COUNT_LOG_FILES)));
- } catch (IOException e) {
- LOG.error("Could not publish metadata size metrics", e);
- }
+ metrics.ifPresent(m -> {
+ try {
+ Map<String, String> stats = m.getStats(false, metaClient, metadata);
+
m.updateMetrics(Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE)),
+
Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE)),
+
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES)),
+
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES)));
+ } catch (HoodieIOException e) {
+ LOG.error("Could not publish metadata size metrics", e);
+ }
+ });
}
/**
@@ -748,16 +748,16 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
* base file.
*/
private JavaRDD<HoodieRecord> prepRecords(JavaSparkContext jsc,
List<HoodieRecord> records, String partitionName) {
- HoodieTable table = HoodieTable.create(metaClient, config,
hadoopConf.get());
+ HoodieTable table = HoodieTable.create(metadataWriteConfig,
hadoopConf.get());
SliceView fsView = table.getSliceView();
List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName)
- .map(s -> s.getBaseFile())
- .filter(b -> b.isPresent())
- .map(b -> b.get())
+ .map(FileSlice::getBaseFile)
+ .filter(Option::isPresent)
+ .map(Option::get)
.collect(Collectors.toList());
// All the metadata fits within a single base file
- if (partitionName.equals(METADATA_PARTITION_NAME)) {
+ if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) {
if (baseFiles.size() > 1) {
throw new HoodieMetadataException("Multiple base files found in
metadata partition");
}
@@ -770,10 +770,10 @@ public class HoodieMetadataWriter extends
HoodieMetadataReader implements Serial
instantTime = baseFiles.get(0).getCommitTime();
} else {
// If there is a log file then we can assume that it has the data
- List<HoodieLogFile> logFiles =
fsView.getLatestFileSlices(HoodieMetadataWriter.METADATA_PARTITION_NAME)
- .map(s -> s.getLatestLogFile())
- .filter(b -> b.isPresent())
- .map(b -> b.get())
+ List<HoodieLogFile> logFiles =
fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath())
+ .map(FileSlice::getLatestLogFile)
+ .filter(Option::isPresent)
+ .map(Option::get)
.collect(Collectors.toList());
if (logFiles.isEmpty()) {
// No base and log files. All are new inserts
diff --git
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
index b91c6d4..8c23ea8 100644
---
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
+++
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
@@ -47,9 +47,6 @@ public class HoodieMetadataFileSystemView extends
HoodieTableFileSystemView {
*/
@Override
protected FileStatus[] listPartition(Path partitionPath) throws IOException {
- FileStatus[] statuses =
hoodieTable.metadata().getAllFilesInPartition(metaClient.getHadoopConf(),
- metaClient.getBasePath(), partitionPath);
-
- return statuses;
+ return hoodieTable.metadata().getAllFilesInPartition(partitionPath);
}
}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
new file mode 100644
index 0000000..dc15e9d
--- /dev/null
+++
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+
+/**
+ * Interface that supports updating metadata for a given table, as actions
complete.
+ */
+public interface HoodieTableMetadataWriter extends Serializable {
+
+ static HoodieTableMetadataWriter create(Configuration conf,
HoodieWriteConfig writeConfig, JavaSparkContext jsc) {
+ return new HoodieBackedTableMetadataWriter(conf, writeConfig, jsc);
+ }
+
+ void update(HoodieCommitMetadata commitMetadata, String instantTime);
+
+ void update(HoodieCleanerPlan cleanerPlan, String instantTime);
+
+ void update(HoodieCleanMetadata cleanMetadata, String instantTime);
+
+ void update(HoodieRestoreMetadata restoreMetadata, String instantTime);
+
+ void update(HoodieRollbackMetadata rollbackMetadata, String instantTime);
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 471fc8d..936b04e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -63,7 +63,8 @@ import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
-import org.apache.hudi.metadata.HoodieMetadataWriter;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.log4j.LogManager;
@@ -94,7 +95,9 @@ public abstract class HoodieTable<T extends
HoodieRecordPayload> implements Seri
private SerializableConfiguration hadoopConfiguration;
private transient FileSystemViewManager viewManager;
- private HoodieMetadataWriter metadataWriter;
+
+ private HoodieTableMetadataWriter metadataWriter;
+ private HoodieTableMetadata metadata;
protected final SparkTaskContextSupplier sparkTaskContextSupplier = new
SparkTaskContextSupplier();
@@ -632,11 +635,18 @@ public abstract class HoodieTable<T extends
HoodieRecordPayload> implements Seri
return getBaseFileFormat() == HoodieFileFormat.HFILE;
}
- public HoodieMetadataWriter metadata() {
- if (metadataWriter == null) {
- metadataWriter = HoodieMetadataWriter.create(hadoopConfiguration.get(),
config);
+ public HoodieTableMetadata metadata() {
+ if (metadata == null) {
+ metadata = HoodieTableMetadata.create(hadoopConfiguration.get(),
config.getBasePath(), config.getSpillableMapBasePath(),
+ config.useFileListingMetadata(),
config.getFileListingMetadataVerify(), config.isMetricsOn(),
config.shouldAssumeDatePartitioning());
}
+ return metadata;
+ }
+ public HoodieTableMetadataWriter metadataWriter(JavaSparkContext jsc) {
+ if (metadataWriter == null) {
+ metadataWriter =
HoodieTableMetadataWriter.create(hadoopConfiguration.get(), config, jsc);
+ }
return metadataWriter;
}
}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
index 2bffcc2..5d9c571 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
@@ -53,6 +53,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -216,7 +217,7 @@ public class HoodieTimelineArchiveLog {
// For metadata tables, ensure commits >= latest compaction commit are
retained. This is required for
// metadata table sync.
- if (table.metadata().isMetadataTable(config.getBasePath())) {
+ if (HoodieTableMetadata.isMetadataTable(config.getBasePath())) {
Option<HoodieInstant> latestCompactionInstant =
table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant();
if (latestCompactionInstant.isPresent()) {
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index b8d4212..ebc8b66 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -256,7 +256,7 @@ public class CleanActionExecutor extends
BaseActionExecutor<HoodieCleanMetadata>
cleanStats
);
- table.metadata().update(jsc, cleanerPlan, cleanInstant.getTimestamp());
+ table.metadataWriter(jsc).update(cleanerPlan,
cleanInstant.getTimestamp());
table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
TimelineMetadataUtils.serializeCleanMetadata(metadata));
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 0bde91a..8a2168c 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -185,8 +185,7 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>>
implements Serializa
*/
private List<String> getPartitionPathsForFullCleaning() throws IOException {
// Go to brute force mode of scanning all partitions
- return
hoodieTable.metadata().getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
- hoodieTable.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning());
+ return hoodieTable.metadata().getAllPartitionPaths();
}
/**
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index 535fa40..1d3e469 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -226,7 +226,7 @@ public abstract class BaseCommitActionExecutor<T extends
HoodieRecordPayload<T>,
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats,
result.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(),
getCommitActionType());
- table.metadata().update(jsc, metadata, instantTime);
+ table.metadataWriter(jsc).update(metadata, instantTime);
activeTimeline.saveAsComplete(new HoodieInstant(true,
getCommitActionType(), instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
b/hudi-client/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
index 1f768de..1dec11d 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
@@ -93,7 +93,7 @@ public abstract class BaseRestoreActionExecutor extends
BaseActionExecutor<Hoodi
HoodieRestoreMetadata restoreMetadata =
TimelineMetadataUtils.convertRestoreMetadata(
instantTime, durationInMs, instantsRolledBack, instantToMetadata);
- table.metadata().update(jsc, restoreMetadata, instantTime);
+ table.metadataWriter(jsc).update(restoreMetadata, instantTime);
table.getActiveTimeline().saveAsComplete(new HoodieInstant(true,
HoodieTimeline.RESTORE_ACTION, instantTime),
TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata));
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 29eb533..4d455c0 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -112,7 +112,7 @@ public abstract class BaseRollbackActionExecutor extends
BaseActionExecutor<Hood
Collections.singletonList(instantToRollback),
stats);
if (!skipTimelinePublish) {
- table.metadata().update(jsc, rollbackMetadata, instantTime);
+ table.metadataWriter(jsc).update(rollbackMetadata, instantTime);
finishRollback(rollbackMetadata);
}
diff --git
a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
similarity index 93%
rename from
hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
rename to
hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
index 62962d5..7dfb67c 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
+++
b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
@@ -21,6 +21,7 @@ package org.apache.hudi.metadata;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -39,6 +40,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.ClientUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieKey;
@@ -75,8 +77,8 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
-public class TestHoodieMetadata extends HoodieClientTestHarness {
- private static final Logger LOG =
LogManager.getLogger(TestHoodieMetadata.class);
+public class TestHoodieFsMetadata extends HoodieClientTestHarness {
+ private static final Logger LOG =
LogManager.getLogger(TestHoodieFsMetadata.class);
@TempDir
public java.nio.file.Path tempFolder;
@@ -100,10 +102,9 @@ public class TestHoodieMetadata extends
HoodieClientTestHarness {
dfs.mkdirs(new Path(basePath));
}
initMetaClient();
-
initTestDataGenerator();
- metadataTableBasePath =
HoodieMetadataReader.getMetadataTableBasePath(basePath);
+ metadataTableBasePath =
HoodieTableMetadata.getMetadataTableBasePath(basePath);
}
@AfterEach
@@ -159,15 +160,15 @@ public class TestHoodieMetadata extends
HoodieClientTestHarness {
try (HoodieWriteClient client = new HoodieWriteClient<>(jsc,
getWriteConfig(true, true))) {
client.startCommitWithTime("005");
- List<String> partitions = metadata(client).getAllPartitionPaths(dfs,
basePath, false);
+ List<String> partitions =
metadataWriter(client).metadata().getAllPartitionPaths();
assertFalse(partitions.contains(nonPartitionDirectory),
"Must not contain the non-partition " + nonPartitionDirectory);
assertTrue(partitions.contains("p1"), "Must contain partition p1");
assertTrue(partitions.contains("p2"), "Must contain partition p2");
- FileStatus[] statuses =
metadata(client).getAllFilesInPartition(hadoopConf, basePath, new
Path(basePath, "p1"));
+ FileStatus[] statuses = metadata(client).getAllFilesInPartition(new
Path(basePath, "p1"));
assertTrue(statuses.length == 2);
- statuses = metadata(client).getAllFilesInPartition(hadoopConf, basePath,
new Path(basePath, "p2"));
+ statuses = metadata(client).getAllFilesInPartition(new Path(basePath,
"p2"));
assertTrue(statuses.length == 5);
}
}
@@ -272,6 +273,7 @@ public class TestHoodieMetadata extends
HoodieClientTestHarness {
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses =
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+ assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// Rollback of inserts
@@ -475,7 +477,7 @@ public class TestHoodieMetadata extends
HoodieClientTestHarness {
* Metadata Table should be automatically compacted as per config.
*/
@ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @ValueSource(booleans = {false})
public void testArchivingAndCompaction(boolean asyncClean) throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
@@ -600,7 +602,7 @@ public class TestHoodieMetadata extends
HoodieClientTestHarness {
List<WriteStatus> writeStatuses =
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
validateMetadata(client);
- List<String> metadataPartitions =
metadata(client).getAllPartitionPaths(dfs, basePath, false);
+ List<String> metadataPartitions =
metadata(client).getAllPartitionPaths();
assertTrue(metadataPartitions.contains(""), "Must contain empty
partition");
}
}
@@ -622,9 +624,9 @@ public class TestHoodieMetadata extends
HoodieClientTestHarness {
validateMetadata(client);
Registry metricsRegistry = Registry.getRegistry("HoodieMetadata");
-
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataWriter.INITIALIZE_STR
+ ".count"));
-
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataWriter.INITIALIZE_STR
+ ".totalDuration"));
-
assertEquals(metricsRegistry.getAllCounts().get(HoodieMetadataWriter.INITIALIZE_STR
+ ".count"), 1L);
+
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR
+ ".count"));
+
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR
+ ".totalDuration"));
+
assertEquals(metricsRegistry.getAllCounts().get(HoodieMetadataMetrics.INITIALIZE_STR
+ ".count"), 1L);
assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.size"));
assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.size"));
assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.count"));
@@ -639,8 +641,8 @@ public class TestHoodieMetadata extends
HoodieClientTestHarness {
*/
private void validateMetadata(HoodieWriteClient client) throws IOException {
HoodieWriteConfig config = client.getConfig();
- HoodieMetadataWriter metadata = metadata(client);
- assertFalse(metadata == null, "MetadataWriter should have been
initialized");
+ HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
+ assertNotNull(metadataWriter, "MetadataWriter should have been
initialized");
if (!config.useFileListingMetadata()) {
return;
}
@@ -648,16 +650,16 @@ public class TestHoodieMetadata extends
HoodieClientTestHarness {
HoodieTimer timer = new HoodieTimer().startTimer();
// Validate write config for metadata table
- HoodieWriteConfig metadataWriteConfig = metadata.getWriteConfig();
+ HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig();
assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata
table for metadata table");
assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify
for metadata table");
// Metadata table should be in sync with the dataset
- assertTrue(metadata.isInSync());
+ assertTrue(metadata(client).isInSync());
// Partitions should match
List<String> fsPartitions =
FSUtils.getAllFoldersWithPartitionMetaFile(dfs, basePath);
- List<String> metadataPartitions = metadata.getAllPartitionPaths(dfs,
basePath, false);
+ List<String> metadataPartitions =
metadataWriter.metadata().getAllPartitionPaths();
Collections.sort(fsPartitions);
Collections.sort(metadataPartitions);
@@ -679,7 +681,7 @@ public class TestHoodieMetadata extends
HoodieClientTestHarness {
partitionPath = new Path(basePath, partition);
}
FileStatus[] fsStatuses = FSUtils.getAllDataFilesInPartition(dfs,
partitionPath);
- FileStatus[] metaStatuses =
metadata.getAllFilesInPartition(hadoopConf, basePath, partitionPath);
+ FileStatus[] metaStatuses =
metadataWriter.metadata().getAllFilesInPartition(partitionPath);
List<String> fsFileNames = Arrays.stream(fsStatuses)
.map(s -> s.getPath().getName()).collect(Collectors.toList());
List<String> metadataFilenames = Arrays.stream(metaStatuses)
@@ -700,9 +702,9 @@ public class TestHoodieMetadata extends
HoodieClientTestHarness {
// FileSystemView should expose the same data
List<HoodieFileGroup> fileGroups =
tableView.getAllFileGroups(partition).collect(Collectors.toList());
- fileGroups.stream().forEach(g ->
LogManager.getLogger(TestHoodieMetadata.class).info(g));
- fileGroups.stream().forEach(g -> g.getAllBaseFiles().forEach(b ->
LogManager.getLogger(TestHoodieMetadata.class).info(b)));
- fileGroups.stream().forEach(g -> g.getAllFileSlices().forEach(s ->
LogManager.getLogger(TestHoodieMetadata.class).info(s)));
+ fileGroups.forEach(g ->
LogManager.getLogger(TestHoodieFsMetadata.class).info(g));
+ fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b ->
LogManager.getLogger(TestHoodieFsMetadata.class).info(b)));
+ fileGroups.forEach(g -> g.getAllFileSlices().forEach(s ->
LogManager.getLogger(TestHoodieFsMetadata.class).info(s)));
long numFiles = fileGroups.stream()
.mapToLong(g -> g.getAllBaseFiles().count() +
g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum())
@@ -718,7 +720,7 @@ public class TestHoodieMetadata extends
HoodieClientTestHarness {
HoodieTableMetaClient metadataMetaClient = new
HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
// Metadata table should be in sync with the dataset
- assertTrue(metadata.isInSync());
+ assertTrue(metadataWriter.metadata().isInSync());
// Metadata table is MOR
assertEquals(metadataMetaClient.getTableType(),
HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
@@ -730,9 +732,9 @@ public class TestHoodieMetadata extends
HoodieClientTestHarness {
// Metadata table has a fixed number of partitions
// Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that
function filters all directory
// in the .hoodie folder.
- List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(dfs,
metadata.getMetadataTableBasePath(basePath),
+ List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(dfs,
HoodieTableMetadata.getMetadataTableBasePath(basePath),
false);
- assertEquals(HoodieMetadataWriter.METADATA_ALL_PARTITIONS.length,
metadataTablePartitions.size());
+ assertEquals(MetadataPartitionType.values().length,
metadataTablePartitions.size());
// Metadata table should automatically compact and clean
// versions are +1 as autoclean / compaction happens end of commits
@@ -747,6 +749,7 @@ public class TestHoodieMetadata extends
HoodieClientTestHarness {
"Should limit files to num versions configured");
}
+ List<FileSlice> slices =
fsView.getAllFileSlices(partition).collect(Collectors.toList());
assertTrue(fsView.getAllFileSlices(partition).count() <=
numFileVersions, "Should limit file slice to "
+ numFileVersions + " but was " +
fsView.getAllFileSlices(partition).count());
});
@@ -754,8 +757,14 @@ public class TestHoodieMetadata extends
HoodieClientTestHarness {
LOG.info("Validation time=" + timer.endTimer());
}
- private HoodieMetadataWriter metadata(HoodieWriteClient client) {
- return HoodieMetadataWriter.create(hadoopConf, client.getConfig());
+ private HoodieBackedTableMetadataWriter metadataWriter(HoodieWriteClient
client) {
+ return (HoodieBackedTableMetadataWriter)
HoodieTableMetadataWriter.create(hadoopConf, client.getConfig(), jsc);
+ }
+
+ private HoodieBackedTableMetadata metadata(HoodieWriteClient client) {
+ HoodieWriteConfig clientConfig = client.getConfig();
+ return (HoodieBackedTableMetadata) HoodieTableMetadata.create(hadoopConf,
clientConfig.getBasePath(), clientConfig.getSpillableMapBasePath(),
+ clientConfig.useFileListingMetadata(),
clientConfig.getFileListingMetadataVerify(), false,
clientConfig.shouldAssumeDatePartitioning());
}
// TODO: this can be moved to TestHarness after merge from master
@@ -774,7 +783,7 @@ public class TestHoodieMetadata extends
HoodieClientTestHarness {
boolean
enableMetrics) {
return
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2,
2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
- .withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
+ .withAutoCommit(autoCommit).withAssumeDatePartitioning(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024
* 1024 * 1024)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
.withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
similarity index 59%
rename from
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java
rename to
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index c14f402..5ce933d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -19,11 +19,9 @@
package org.apache.hudi.metadata;
import java.io.IOException;
-import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -43,7 +41,6 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -64,112 +61,68 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/**
- * Reader for Metadata Table.
+ * Table metadata provided by an internal DFS backed Hudi metadata table.
*
* If the metadata table does not exist, RPC calls are used to retrieve file
listings from the file system.
* No updates are applied to the table and it is not synced.
*/
-public class HoodieMetadataReader implements Serializable {
- private static final Logger LOG =
LogManager.getLogger(HoodieMetadataReader.class);
-
- // Base path of the Metadata Table relative to the dataset (.hoodie/metadata)
- private static final String METADATA_TABLE_REL_PATH =
HoodieTableMetaClient.METAFOLDER_NAME + Path.SEPARATOR
- + "metadata";
-
- // Table name suffix
- protected static final String METADATA_TABLE_NAME_SUFFIX = "_metadata";
-
- // Timestamp for a commit when the base dataset had not had any commits yet.
- protected static final String SOLO_COMMIT_TIMESTAMP = "00000000000000";
-
-
- // Name of partition which saves file listings
- public static final String METADATA_PARTITION_NAME = "metadata_partition";
- // List of all partitions
- public static final String[] METADATA_ALL_PARTITIONS =
{METADATA_PARTITION_NAME};
- // Key for the record which saves list of all partitions
- protected static final String RECORDKEY_PARTITION_LIST =
"__all_partitions__";
-
- // The partition name used for non-partitioned tables
- protected static final String NON_PARTITIONED_NAME = ".";
-
- // Metric names
- public static final String LOOKUP_PARTITIONS_STR = "lookup_partitions";
- public static final String LOOKUP_FILES_STR = "lookup_files";
- public static final String VALIDATE_PARTITIONS_STR = "validate_partitions";
- public static final String VALIDATE_FILES_STR = "validate_files";
- public static final String VALIDATE_ERRORS_STR = "validate_errors";
- public static final String SCAN_STR = "scan";
- public static final String BASEFILE_READ_STR = "basefile_read";
-
- // Stats names
- public static final String STAT_TOTAL_BASE_FILE_SIZE =
"totalBaseFileSizeInBytes";
- public static final String STAT_TOTAL_LOG_FILE_SIZE =
"totalLogFileSizeInBytes";
- public static final String STAT_COUNT_BASE_FILES = "baseFileCount";
- public static final String STAT_COUNT_LOG_FILES = "logFileCount";
- public static final String STAT_COUNT_PARTITION = "partitionCount";
- public static final String STAT_IN_SYNC = "isInSync";
- public static final String STAT_LAST_COMPACTION_TIMESTAMP =
"lastCompactionTimestamp";
-
- // A base directory where the metadata tables should be saved outside the
dataset directory.
- // This is used in tests on existing datasets.
- private static String metadataBaseDirectory;
-
- protected final SerializableConfiguration hadoopConf;
- protected final String datasetBasePath;
- protected final String metadataBasePath;
- protected Registry metricsRegistry;
- protected HoodieTableMetaClient metaClient;
- protected boolean enabled;
- private final boolean validateLookups;
- private long maxMemorySizeInBytes = 1024 * 1024 * 1024; // TODO
- private int bufferSize = 10 * 1024 * 1024; // TODO
+public class HoodieBackedTableMetadata implements HoodieTableMetadata {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieBackedTableMetadata.class);
+ private static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
+ private static final int BUFFER_SIZE = 10 * 1024 * 1024;
- // Directory used for Splillable Map when merging records
- private String spillableMapDirectory;
+ private final SerializableConfiguration hadoopConf;
+ private final String datasetBasePath;
+ private final String metadataBasePath;
+ private final Option<HoodieMetadataMetrics> metrics;
+ private HoodieTableMetaClient metaClient;
+
+ private boolean enabled;
+ private final boolean validateLookups;
+ private final boolean assumeDatePartitioning;
+ // Directory used for Spillable Map when merging records
+ private final String spillableMapDirectory;
// Readers for the base and log file which store the metadata
private transient HoodieFileReader<GenericRecord> basefileReader;
private transient HoodieMetadataMergedLogRecordScanner logRecordScanner;
- /**
- * Create a the Metadata Table in read-only mode.
- */
- public HoodieMetadataReader(Configuration conf, String datasetBasePath,
String spillableMapDirectory,
- boolean enabled, boolean validateLookups) {
- this(conf, datasetBasePath, spillableMapDirectory, enabled,
validateLookups, false);
+ public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath,
String spillableMapDirectory,
+ boolean enabled, boolean validateLookups,
boolean assumeDatePartitioning) {
+ this(conf, datasetBasePath, spillableMapDirectory, enabled,
validateLookups, false, assumeDatePartitioning);
}
- /**
- * Create a the Metadata Table in read-only mode.
- */
- public HoodieMetadataReader(Configuration conf, String datasetBasePath,
String spillableMapDirectory,
- boolean enabled, boolean validateLookups,
boolean enableMetrics) {
+ public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath,
String spillableMapDirectory,
+ boolean enabled, boolean validateLookups,
boolean enableMetrics,
+ boolean assumeDatePartitioning) {
this.hadoopConf = new SerializableConfiguration(conf);
this.datasetBasePath = datasetBasePath;
- this.metadataBasePath = getMetadataTableBasePath(datasetBasePath);
+ this.metadataBasePath =
HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath);
this.validateLookups = validateLookups;
this.spillableMapDirectory = spillableMapDirectory;
+ this.enabled = enabled;
+ this.assumeDatePartitioning = assumeDatePartitioning;
if (enabled) {
try {
- metaClient = new HoodieTableMetaClient(hadoopConf.get(),
metadataBasePath);
+ this.metaClient = new HoodieTableMetaClient(hadoopConf.get(),
metadataBasePath);
} catch (TableNotFoundException e) {
LOG.error("Metadata table was not found at path " + metadataBasePath);
- enabled = false;
+ this.enabled = false;
} catch (Exception e) {
LOG.error("Failed to initialize metadata table at path " +
metadataBasePath, e);
- enabled = false;
+ this.enabled = false;
}
} else {
LOG.info("Metadata table is disabled.");
}
if (enableMetrics) {
- metricsRegistry = Registry.getRegistry("HoodieMetadata");
+ this.metrics = Option.of(new
HoodieMetadataMetrics(Registry.getRegistry("HoodieMetadata")));
+ } else {
+ this.metrics = Option.empty();
}
-
- this.enabled = enabled;
}
/**
@@ -180,21 +133,20 @@ public class HoodieMetadataReader implements Serializable
{
*
* On any errors retrieving the listing from the metadata, defaults to using
the file system listings.
*
- * @param fs The {@code FileSystem}
- * @param basePath Base path of the dataset
- * @param assumeDatePartitioning True if the dataset uses date based
partitioning
*/
- public List<String> getAllPartitionPaths(FileSystem fs, String basePath,
boolean assumeDatePartitioning)
+ @Override
+ public List<String> getAllPartitionPaths()
throws IOException {
if (enabled) {
try {
- return getAllPartitionPaths();
+ return fetchAllPartitionPaths();
} catch (Exception e) {
- LOG.error("Failed to retrive list of partition from metadata", e);
+ LOG.error("Failed to retrieve list of partition from metadata", e);
}
}
- return getAllPartitionPathsByListing(fs, basePath, assumeDatePartitioning);
+ FileSystem fs = FSUtils.getFs(datasetBasePath, hadoopConf.get());
+ return FSUtils.getAllPartitionPaths(fs, datasetBasePath,
assumeDatePartitioning);
}
/**
@@ -205,29 +157,29 @@ public class HoodieMetadataReader implements Serializable
{
*
* On any errors retrieving the listing from the metadata, defaults to using
the file system listings.
*
- * @param hadoopConf {@code Configuration}
* @param partitionPath The absolute path of the partition to list
*/
- public FileStatus[] getAllFilesInPartition(Configuration hadoopConf, String
basePath, Path partitionPath)
+ @Override
+ public FileStatus[] getAllFilesInPartition(Path partitionPath)
throws IOException {
if (enabled) {
try {
- return getAllFilesInPartition(partitionPath);
+ return fetchAllFilesInPartition(partitionPath);
} catch (Exception e) {
LOG.error("Failed to retrive files in partition " + partitionPath + "
from metadata", e);
}
}
- return getAllFilesInPartitionByListing(hadoopConf, basePath,
partitionPath);
+ return FSUtils.getFs(partitionPath.toString(),
hadoopConf.get()).listStatus(partitionPath);
}
/**
* Returns a list of all partitions.
*/
- protected List<String> getAllPartitionPaths() throws IOException {
+ protected List<String> fetchAllPartitionPaths() throws IOException {
HoodieTimer timer = new HoodieTimer().startTimer();
Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord =
getMergedRecordByKey(RECORDKEY_PARTITION_LIST);
- updateMetrics(LOOKUP_PARTITIONS_STR, timer.endTimer());
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer()));
List<String> partitions = Collections.emptyList();
if (hoodieRecord.isPresent()) {
@@ -247,8 +199,8 @@ public class HoodieMetadataReader implements Serializable {
if (validateLookups) {
// Validate the Metadata Table data by listing the partitions from the
file system
timer.startTimer();
- List<String> actualPartitions =
getAllPartitionPathsByListing(metaClient.getFs(), datasetBasePath, false);
- updateMetrics(VALIDATE_PARTITIONS_STR, timer.endTimer());
+ List<String> actualPartitions =
FSUtils.getAllPartitionPaths(metaClient.getFs(), datasetBasePath, false);
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR,
timer.endTimer()));
Collections.sort(actualPartitions);
Collections.sort(partitions);
@@ -257,7 +209,7 @@ public class HoodieMetadataReader implements Serializable {
LOG.error("Partitions from metadata: " +
Arrays.toString(partitions.toArray()));
LOG.error("Partitions from file system: " +
Arrays.toString(actualPartitions.toArray()));
- updateMetrics(VALIDATE_ERRORS_STR, 0);
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
}
// Return the direct listing as it should be correct
@@ -273,15 +225,15 @@ public class HoodieMetadataReader implements Serializable
{
*
* @param partitionPath The absolute path of the partition
*/
- FileStatus[] getAllFilesInPartition(Path partitionPath) throws IOException {
+ FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException
{
String partitionName = FSUtils.getRelativePartitionPath(new
Path(datasetBasePath), partitionPath);
- if (partitionName.equals("")) {
+ if (partitionName.isEmpty()) {
partitionName = NON_PARTITIONED_NAME;
}
HoodieTimer timer = new HoodieTimer().startTimer();
Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord =
getMergedRecordByKey(partitionName);
- updateMetrics(LOOKUP_FILES_STR, timer.endTimer());
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
FileStatus[] statuses = {};
if (hoodieRecord.isPresent()) {
@@ -299,7 +251,7 @@ public class HoodieMetadataReader implements Serializable {
// Ignore partition metadata file
FileStatus[] directStatuses =
metaClient.getFs().listStatus(partitionPath,
p ->
!p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
- updateMetrics(VALIDATE_FILES_STR, timer.endTimer());
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer()));
List<String> directFilenames = Arrays.stream(directStatuses)
.map(s -> s.getPath().getName()).sorted()
@@ -314,7 +266,7 @@ public class HoodieMetadataReader implements Serializable {
LOG.error("File list from metadata: " +
Arrays.toString(metadataFilenames.toArray()));
LOG.error("File list from direct listing: " +
Arrays.toString(directFilenames.toArray()));
- updateMetrics(VALIDATE_ERRORS_STR, 0);
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
}
// Return the direct listing as it should be correct
@@ -325,16 +277,6 @@ public class HoodieMetadataReader implements Serializable {
return statuses;
}
- private FileStatus[] getAllFilesInPartitionByListing(Configuration
hadoopConf, String basePath, Path partitionPath)
- throws IOException {
- return FSUtils.getFs(partitionPath.toString(),
hadoopConf).listStatus(partitionPath);
- }
-
- private List<String> getAllPartitionPathsByListing(FileSystem fs, String
basePath, boolean assumeDatePartitioning)
- throws IOException {
- return FSUtils.getAllPartitionPaths(fs, basePath, assumeDatePartitioning);
- }
-
/**
* Retrieve the merged {@code HoodieRecord} mapped to the given key.
*
@@ -349,9 +291,9 @@ public class HoodieMetadataReader implements Serializable {
HoodieTimer timer = new HoodieTimer().startTimer();
Option<GenericRecord> baseRecord = basefileReader.getRecordByKey(key);
if (baseRecord.isPresent()) {
- hoodieRecord =
SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) baseRecord.get(),
+ hoodieRecord =
SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
metaClient.getTableConfig().getPayloadClass());
- updateMetrics(BASEFILE_READ_STR, timer.endTimer());
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, timer.endTimer()));
}
}
@@ -379,19 +321,18 @@ public class HoodieMetadataReader implements Serializable
{
return;
}
- long t1 = System.currentTimeMillis();
+ HoodieTimer timer = new HoodieTimer().startTimer();
// Metadata is in sync till the latest completed instant on the dataset
HoodieTableMetaClient datasetMetaClient = new
HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
Option<HoodieInstant> datasetLatestInstant =
datasetMetaClient.getActiveTimeline().filterCompletedInstants()
.lastInstant();
- String latestInstantTime = datasetLatestInstant.isPresent() ?
datasetLatestInstant.get().getTimestamp()
- : SOLO_COMMIT_TIMESTAMP;
+ String latestInstantTime =
datasetLatestInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
// Find the latest file slice
HoodieTimeline timeline = metaClient.reloadActiveTimeline();
HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
- List<FileSlice> latestSlices =
fsView.getLatestFileSlices(METADATA_PARTITION_NAME).collect(Collectors.toList());
+ List<FileSlice> latestSlices =
fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
ValidationUtils.checkArgument(latestSlices.size() == 1);
// If the base file is present then create a reader
@@ -399,7 +340,6 @@ public class HoodieMetadataReader implements Serializable {
if (basefile.isPresent()) {
String basefilePath = basefile.get().getPath();
basefileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(),
new Path(basefilePath));
-
LOG.info("Opened metadata base file from " + basefilePath + " at instant
" + basefile.get().getCommitTime());
}
@@ -408,26 +348,26 @@ public class HoodieMetadataReader implements Serializable
{
.collect(Collectors.toList());
Option<HoodieInstant> lastInstant =
timeline.filterCompletedInstants().lastInstant();
- String latestMetaInstantTimestamp = lastInstant.isPresent() ?
lastInstant.get().getTimestamp()
- : SOLO_COMMIT_TIMESTAMP;
+ String latestMetaInstantTimestamp =
lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+
if (!HoodieTimeline.compareTimestamps(latestInstantTime,
HoodieTimeline.EQUALS, latestMetaInstantTimestamp)) {
- // TODO: This can be false positive if the metadata table had a
compaction or clean
+ // TODO(metadata): This can be false positive if the metadata table had
a compaction or clean
LOG.warn("Metadata has more recent instant " +
latestMetaInstantTimestamp + " than dataset " + latestInstantTime);
}
// Load the schema
Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
- // TODO: The below code may open the metadata to include incomplete
instants on the dataset
+ // TODO(metadata): The below code may open the metadata to include
incomplete instants on the dataset
logRecordScanner =
new HoodieMetadataMergedLogRecordScanner(metaClient.getFs(),
metadataBasePath,
- logFilePaths, schema, latestMetaInstantTimestamp,
maxMemorySizeInBytes, bufferSize,
+ logFilePaths, schema, latestMetaInstantTimestamp,
MAX_MEMORY_SIZE_IN_BYTES, BUFFER_SIZE,
spillableMapDirectory, null);
LOG.info("Opened metadata log files from " + logFilePaths + " at instant "
+ latestInstantTime
+ "(dataset instant=" + latestInstantTime + ", metadata instant=" +
latestMetaInstantTimestamp + ")");
- updateMetrics(SCAN_STR, System.currentTimeMillis() - t1);
+ metrics.ifPresent(metrics ->
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, timer.endTimer()));
}
protected void closeReaders() {
@@ -441,8 +381,8 @@ public class HoodieMetadataReader implements Serializable {
/**
* Return {@code True} if all Instants from the dataset have been synced
with the Metadata Table.
*/
+ @Override
public boolean isInSync() {
- // There should not be any instants to sync
return enabled && findInstantsToSync().isEmpty();
}
@@ -464,19 +404,22 @@ public class HoodieMetadataReader implements Serializable
{
// If there has not been any compaction then the first delta commit
instant should be the one at which
// the metadata table was created. We should not sync any instants before
that creation time.
+ // FIXME(metadata): or it could be that compaction has not happened for a
while, right.
Option<HoodieInstant> oldestMetaInstant = Option.empty();
if (!compactionTimestamp.isPresent()) {
oldestMetaInstant =
metaTimeline.getDeltaCommitTimeline().filterCompletedInstants().firstInstant();
if (oldestMetaInstant.isPresent()) {
- // TODO: Ensure this is the instant at which we created the metadata
table
+ // FIXME(metadata): Ensure this is the instant at which we created the
metadata table
}
}
- String metaSyncTimestamp = compactionTimestamp.isPresent() ?
compactionTimestamp.get()
- : oldestMetaInstant.isPresent() ?
oldestMetaInstant.get().getTimestamp() : SOLO_COMMIT_TIMESTAMP;
+ String metaSyncTimestamp = compactionTimestamp.orElse(
+
oldestMetaInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP)
+ );
// Metadata table is updated when an instant is completed except for the
following:
// CLEAN: metadata table is updated during inflight. So for CLEAN we
accept inflight actions.
+ // FIXME(metadata): This need not be the case, right? It's risky to do
this?
List<HoodieInstant> datasetInstants =
datasetMetaClient.getActiveTimeline().getInstants()
.filter(i -> i.isCompleted() ||
(i.getAction().equals(HoodieTimeline.CLEAN_ACTION) && i.isInflight()))
.filter(i -> metaSyncTimestamp.isEmpty()
@@ -495,7 +438,7 @@ public class HoodieMetadataReader implements Serializable {
if (metadataInstantMap.containsKey(instant.getTimestamp())) {
// instant already synced to metadata table
if (!instantsToSync.isEmpty()) {
- // TODO: async clean and async compaction are not yet handled. They
have a timestamp which is in the past
+ // FIXME(metadata): async clean and async compaction are not yet
handled. They have a timestamp which is in the past
// (when the operation was scheduled) and even on completion they
retain their old timestamp.
LOG.warn("Found out-of-order already synced instant " + instant + ".
Instants to sync=" + instantsToSync);
}
@@ -509,11 +452,13 @@ public class HoodieMetadataReader implements Serializable
{
/**
* Return the timestamp of the latest compaction instant.
*/
+ @Override
public Option<String> getLatestCompactionTimestamp() {
if (!enabled) {
return Option.empty();
}
+ //FIXME(metadata): should we really reload this?
HoodieTimeline timeline = metaClient.reloadActiveTimeline();
Option<HoodieInstant> lastCompactionInstant =
timeline.filterCompletedInstants()
.filter(i ->
i.getAction().equals(HoodieTimeline.COMMIT_ACTION)).lastInstant();
@@ -525,114 +470,23 @@ public class HoodieMetadataReader implements
Serializable {
}
}
- public Map<String, String> getStats(boolean detailed) throws IOException {
- metaClient.reloadActiveTimeline();
- HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
- return getStats(fsView, detailed);
- }
-
- private Map<String, String> getStats(HoodieTableFileSystemView fsView,
boolean detailed) throws IOException {
- Map<String, String> stats = new HashMap<>();
-
- // Total size of the metadata and count of base/log files
- long totalBaseFileSizeInBytes = 0;
- long totalLogFileSizeInBytes = 0;
- int baseFileCount = 0;
- int logFileCount = 0;
- List<FileSlice> latestSlices =
fsView.getLatestFileSlices(METADATA_PARTITION_NAME).collect(Collectors.toList());
-
- for (FileSlice slice : latestSlices) {
- if (slice.getBaseFile().isPresent()) {
- totalBaseFileSizeInBytes +=
slice.getBaseFile().get().getFileStatus().getLen();
- ++baseFileCount;
- }
- Iterator<HoodieLogFile> it = slice.getLogFiles().iterator();
- while (it.hasNext()) {
- totalLogFileSizeInBytes += it.next().getFileStatus().getLen();
- ++logFileCount;
- }
- }
-
- stats.put(STAT_TOTAL_BASE_FILE_SIZE,
String.valueOf(totalBaseFileSizeInBytes));
- stats.put(STAT_TOTAL_LOG_FILE_SIZE,
String.valueOf(totalLogFileSizeInBytes));
- stats.put(STAT_COUNT_BASE_FILES, String.valueOf(baseFileCount));
- stats.put(STAT_COUNT_LOG_FILES, String.valueOf(logFileCount));
-
- if (detailed) {
- stats.put(STAT_COUNT_PARTITION,
String.valueOf(getAllPartitionPaths().size()));
- stats.put(STAT_IN_SYNC, String.valueOf(isInSync()));
- stats.put(STAT_LAST_COMPACTION_TIMESTAMP,
getLatestCompactionTimestamp().orElseGet(() -> "none"));
- }
-
- return stats;
- }
-
- protected void updateMetrics(String action, long durationInMs) {
- if (metricsRegistry == null) {
- return;
- }
-
- // Update sum of duration and total for count
- String countKey = action + ".count";
- String durationKey = action + ".totalDuration";
- metricsRegistry.add(countKey, 1);
- metricsRegistry.add(durationKey, durationInMs);
-
- LOG.info(String.format("Updating metadata metrics (%s=%dms, %s=1)",
durationKey, durationInMs, countKey));
- }
-
- protected void updateMetrics(long totalBaseFileSizeInBytes, long
totalLogFileSizeInBytes, int baseFileCount,
- int logFileCount) {
- if (metricsRegistry == null) {
- return;
- }
-
- // Update sizes and count for metadata table's data files
- metricsRegistry.add("basefile.size", totalBaseFileSizeInBytes);
- metricsRegistry.add("logfile.size", totalLogFileSizeInBytes);
- metricsRegistry.add("basefile.count", baseFileCount);
- metricsRegistry.add("logfile.count", logFileCount);
-
- LOG.info(String.format("Updating metadata size metrics (basefile.size=%d,
logfile.size=%d, basefile.count=%d, "
- + "logfile.count=%d)", totalBaseFileSizeInBytes,
totalLogFileSizeInBytes, baseFileCount, logFileCount));
+ public boolean enabled() {
+ return enabled;
}
- /**
- * Return the base path of the Metadata Table.
- *
- * @param tableBasePath The base path of the dataset
- */
- public static String getMetadataTableBasePath(String tableBasePath) {
- if (metadataBaseDirectory != null) {
- return metadataBaseDirectory;
- }
-
- return tableBasePath + Path.SEPARATOR + METADATA_TABLE_REL_PATH;
+ public SerializableConfiguration getHadoopConf() {
+ return hadoopConf;
}
- /**
- * Returns {@code True} if the given path contains a metadata table.
- *
- * @param basePath The base path to check
- */
- public static boolean isMetadataTable(String basePath) {
- return basePath.endsWith(METADATA_TABLE_REL_PATH);
+ public String getDatasetBasePath() {
+ return datasetBasePath;
}
- /**
- * Sets the directory to store/read Metadata Table.
- *
- * This can be used to store the metadata table away from the dataset
directory.
- * - Useful for testing as well as for using via the HUDI CLI so that the
actual dataset is not written to.
- * - Useful for testing Metadata Table performance and operations on
existing datasets before enabling.
- */
- public static void setMetadataBaseDirectory(String metadataDir) {
- ValidationUtils.checkState(metadataBaseDirectory == null,
- "metadataBaseDirectory is already set to " + metadataBaseDirectory);
- metadataBaseDirectory = metadataDir;
+ public HoodieTableMetaClient getMetaClient() {
+ return metaClient;
}
- public boolean enabled() {
- return enabled;
+ public Map<String, String> stats() {
+ return metrics.map(m -> m.getStats(true, metaClient, this)).orElse(new
HashMap<>());
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
index 191bbcd..3f245ad 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
@@ -70,11 +70,6 @@ public class HoodieMetadataMergedLogRecordScanner extends
HoodieMergedLogRecordS
* @return {@code HoodieRecord} if key was found else {@code Option.empty()}
*/
public Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String
key) {
- HoodieRecord record = records.get(key);
- if (record == null) {
- return Option.empty();
- }
-
- return Option.of(record);
+ return Option.ofNullable((HoodieRecord) records.get(key));
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
new file mode 100644
index 0000000..29a2219
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HoodieMetadataMetrics implements Serializable {
+
+ // Metric names
+ public static final String LOOKUP_PARTITIONS_STR = "lookup_partitions";
+ public static final String LOOKUP_FILES_STR = "lookup_files";
+ public static final String VALIDATE_PARTITIONS_STR = "validate_partitions";
+ public static final String VALIDATE_FILES_STR = "validate_files";
+ public static final String VALIDATE_ERRORS_STR = "validate_errors";
+ public static final String SCAN_STR = "scan";
+ public static final String BASEFILE_READ_STR = "basefile_read";
+ public static final String INITIALIZE_STR = "initialize";
+ public static final String SYNC_STR = "sync";
+
+ // Stats names
+ public static final String STAT_TOTAL_BASE_FILE_SIZE =
"totalBaseFileSizeInBytes";
+ public static final String STAT_TOTAL_LOG_FILE_SIZE =
"totalLogFileSizeInBytes";
+ public static final String STAT_COUNT_BASE_FILES = "baseFileCount";
+ public static final String STAT_COUNT_LOG_FILES = "logFileCount";
+ public static final String STAT_COUNT_PARTITION = "partitionCount";
+ public static final String STAT_IN_SYNC = "isInSync";
+ public static final String STAT_LAST_COMPACTION_TIMESTAMP =
"lastCompactionTimestamp";
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieMetadataMetrics.class);
+
+ private final Registry metricsRegistry;
+
+ public HoodieMetadataMetrics(Registry metricsRegistry) {
+ this.metricsRegistry = metricsRegistry;
+ }
+
+ public Map<String, String> getStats(boolean detailed, HoodieTableMetaClient
metaClient, HoodieTableMetadata metadata) {
+ try {
+ metaClient.reloadActiveTimeline();
+ HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
+ return getStats(fsView, detailed, metadata);
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Unable to get metadata stats.", ioe);
+ }
+ }
+
+ private Map<String, String> getStats(HoodieTableFileSystemView fsView,
boolean detailed, HoodieTableMetadata tableMetadata) throws IOException {
+ Map<String, String> stats = new HashMap<>();
+
+ // Total size of the metadata and count of base/log files
+ long totalBaseFileSizeInBytes = 0;
+ long totalLogFileSizeInBytes = 0;
+ int baseFileCount = 0;
+ int logFileCount = 0;
+ List<FileSlice> latestSlices =
fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
+
+ for (FileSlice slice : latestSlices) {
+ if (slice.getBaseFile().isPresent()) {
+ totalBaseFileSizeInBytes +=
slice.getBaseFile().get().getFileStatus().getLen();
+ ++baseFileCount;
+ }
+ Iterator<HoodieLogFile> it = slice.getLogFiles().iterator();
+ while (it.hasNext()) {
+ totalLogFileSizeInBytes += it.next().getFileStatus().getLen();
+ ++logFileCount;
+ }
+ }
+
+ stats.put(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE,
String.valueOf(totalBaseFileSizeInBytes));
+ stats.put(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE,
String.valueOf(totalLogFileSizeInBytes));
+ stats.put(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES,
String.valueOf(baseFileCount));
+ stats.put(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES,
String.valueOf(logFileCount));
+
+ if (detailed) {
+ stats.put(HoodieMetadataMetrics.STAT_COUNT_PARTITION,
String.valueOf(tableMetadata.getAllPartitionPaths().size()));
+ stats.put(HoodieMetadataMetrics.STAT_IN_SYNC,
String.valueOf(tableMetadata.isInSync()));
+ stats.put(HoodieMetadataMetrics.STAT_LAST_COMPACTION_TIMESTAMP,
tableMetadata.getLatestCompactionTimestamp().orElseGet(() -> "none"));
+ }
+
+ return stats;
+ }
+
+ protected void updateMetrics(String action, long durationInMs) {
+ if (metricsRegistry == null) {
+ return;
+ }
+
+ // Update sum of duration and total for count
+ String countKey = action + ".count";
+ String durationKey = action + ".totalDuration";
+ metricsRegistry.add(countKey, 1);
+ metricsRegistry.add(durationKey, durationInMs);
+
+ LOG.info(String.format("Updating metadata metrics (%s=%dms, %s=1)",
durationKey, durationInMs, countKey));
+ }
+
+ protected void updateMetrics(long totalBaseFileSizeInBytes, long
totalLogFileSizeInBytes, int baseFileCount,
+ int logFileCount) {
+ if (metricsRegistry == null) {
+ return;
+ }
+
+ // Update sizes and count for metadata table's data files
+ metricsRegistry.add("basefile.size", totalBaseFileSizeInBytes);
+ metricsRegistry.add("logfile.size", totalLogFileSizeInBytes);
+ metricsRegistry.add("basefile.count", baseFileCount);
+ metricsRegistry.add("logfile.count", logFileCount);
+
+ LOG.info(String.format("Updating metadata size metrics (basefile.size=%d,
logfile.size=%d, basefile.count=%d, "
+ + "logfile.count=%d)", totalBaseFileSizeInBytes,
totalLogFileSizeInBytes, baseFileCount, logFileCount));
+ }
+
+ public Registry registry() {
+ return metricsRegistry;
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 087984c..d99b342 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -40,8 +40,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static
org.apache.hudi.metadata.HoodieMetadataReader.METADATA_PARTITION_NAME;
-import static
org.apache.hudi.metadata.HoodieMetadataReader.RECORDKEY_PARTITION_LIST;
+import static
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
/**
* This is a payload which saves information about a single entry in the
Metadata Table.
@@ -100,7 +99,7 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
partitions.forEach(partition -> fileInfo.put(partition, new
HoodieMetadataFileInfo(0L, false)));
- HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST,
METADATA_PARTITION_NAME);
+ HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.partitionPath());
HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
return new HoodieRecord<>(key, payload);
}
@@ -120,7 +119,7 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
filesDeleted.ifPresent(
m -> m.forEach(filename -> fileInfo.put(filename, new
HoodieMetadataFileInfo(0L, true))));
- HoodieKey key = new HoodieKey(partition, METADATA_PARTITION_NAME);
+ HoodieKey key = new HoodieKey(partition,
MetadataPartitionType.FILES.partitionPath());
HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
return new HoodieRecord<>(key, payload);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
new file mode 100644
index 0000000..3a1a7a4
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface that supports querying various pieces of metadata about a hudi
table.
+ */
+public interface HoodieTableMetadata extends Serializable {
+
+ // Table name suffix
+ String METADATA_TABLE_NAME_SUFFIX = "_metadata";
+ // Timestamp for a commit when the base dataset had not had any commits yet.
+ String SOLO_COMMIT_TIMESTAMP = "00000000000000";
+ // Key for the record which saves list of all partitions
+ String RECORDKEY_PARTITION_LIST = "__all_partitions__";
+ // The partition name used for non-partitioned tables
+ String NON_PARTITIONED_NAME = ".";
+
+ // Base path of the Metadata Table relative to the dataset (.hoodie/metadata)
+ static final String METADATA_TABLE_REL_PATH =
HoodieTableMetaClient.METAFOLDER_NAME + Path.SEPARATOR + "metadata";
+
+ /**
+ * Return the base path of the Metadata Table.
+ *
+ * @param tableBasePath The base path of the dataset
+ */
+ static String getMetadataTableBasePath(String tableBasePath) {
+ return tableBasePath + Path.SEPARATOR + METADATA_TABLE_REL_PATH;
+ }
+
+ /**
+ * Returns {@code True} if the given path contains a metadata table.
+ *
+ * @param basePath The base path to check
+ */
+ static boolean isMetadataTable(String basePath) {
+ return basePath.endsWith(METADATA_TABLE_REL_PATH);
+ }
+
+ static HoodieTableMetadata create(Configuration conf, String
datasetBasePath, String spillableMapPath, boolean useFileListingFromMetadata,
+ boolean verifyListings, boolean
enableMetrics, boolean shouldAssumeDatePartitioning) {
+ return new HoodieBackedTableMetadata(conf, datasetBasePath,
spillableMapPath, useFileListingFromMetadata, verifyListings,
+ enableMetrics, shouldAssumeDatePartitioning);
+ }
+
+ /**
+ * Fetch all the files at the given partition path, per the latest snapshot
of the metadata.
+ */
+ FileStatus[] getAllFilesInPartition(Path partitionPath) throws IOException;
+
+ /**
+ * Fetch list of all partition paths, per the latest snapshot of the
metadata.
+ */
+ List<String> getAllPartitionPaths() throws IOException;
+
+ Option<String> getLatestCompactionTimestamp();
+
+ boolean isInSync();
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
new file mode 100644
index 0000000..0436de7
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+public enum MetadataPartitionType {
+ FILES("files");
+
+ private final String partitionPath;
+
+ MetadataPartitionType(String partitionPath) {
+ this.partitionPath = partitionPath;
+ }
+
+ public String partitionPath() {
+ return partitionPath;
+ }
+}