vinothchandar commented on a change in pull request #2441:
URL: https://github.com/apache/hudi/pull/2441#discussion_r556945779
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -192,7 +196,8 @@ public CleanPlanner(HoodieTable<T, I, K, O> hoodieTable,
HoodieWriteConfig confi
*/
private List<String> getPartitionPathsForFullCleaning() throws IOException {
// Go to brute force mode of scanning all partitions
- return hoodieTable.metadata().getAllPartitionPaths();
+ return FSUtils.getAllPartitionPaths(context,
hoodieTable.getMetaClient().getFs(), config.getBasePath(),
Review comment:
this should also take `HoodieMetadataConfig` as an arg
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -198,14 +200,20 @@ public boolean archiveIfRequired(HoodieEngineContext
context) throws IOException
// If metadata table is enabled, do not archive instants which are more
recent that the latest synced
// instant on the metadata table. This is required for metadata table sync.
if (config.useFileListingMetadata()) {
- Option<String> lastSyncedInstantTime =
table.metadata().getSyncedInstantTime();
- if (lastSyncedInstantTime.isPresent()) {
- LOG.info("Limiting archiving of instants to last synced instant on
metadata table at " + lastSyncedInstantTime.get());
- instants = instants.filter(i ->
HoodieTimeline.compareTimestamps(i.getTimestamp(), HoodieTimeline.LESSER_THAN,
- lastSyncedInstantTime.get()));
- } else {
- LOG.info("Not archiving as there is no instants yet on the metadata
table");
- instants = Stream.empty();
+ try (HoodieTableMetadata tableMetadata =
HoodieTableMetadata.create(table.getContext(), config.getBasePath(), "/tmp/",
Review comment:
do we just pass `/tmp` always? . `HoodieTableMetadata#create()` should
just take `HoodieMetadataConfig` as arg
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
##########
@@ -192,42 +195,58 @@ private static RemoteHoodieTableFileSystemView
createRemoteFileSystemView(Serial
metaClient, viewConf.getRemoteTimelineClientTimeoutSecs());
}
+ public static FileSystemViewManager createViewManager(final
HoodieEngineContext context,
+ final
HoodieMetadataConfig metadataConfig,
+ final
FileSystemViewStorageConfig config) {
+ return createViewManager(context, metadataConfig, config,
(SerializableSupplier<HoodieTableMetadata>) null);
+ }
+
+ public static FileSystemViewManager createViewManager(final
HoodieEngineContext context,
+ final
HoodieMetadataConfig metadataConfig,
+ final
FileSystemViewStorageConfig config,
+ final String basePath)
{
+ return createViewManager(context, metadataConfig, config,
+ () -> HoodieTableMetadata.create(context, basePath,
FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR,
+ metadataConfig.useFileListingMetadata(),
metadataConfig.getFileListingMetadataVerify(),
+ false, metadataConfig.shouldAssumeDatePartitioning()));
+ }
+
/**
* Main Factory method for building file-system views.
- *
- * @param conf Hadoop Configuration
- * @param config View Storage Configuration
- * @return
+ *
*/
- public static FileSystemViewManager createViewManager(final
SerializableConfiguration conf,
- final FileSystemViewStorageConfig config) {
+ public static FileSystemViewManager createViewManager(final
HoodieEngineContext context,
+ final
HoodieMetadataConfig metadataConfig,
+ final
FileSystemViewStorageConfig config,
+ final
SerializableSupplier<HoodieTableMetadata> metadataSupplier) {
LOG.info("Creating View Manager with storage type :" +
config.getStorageType());
+ final SerializableConfiguration conf = context.getHadoopConf();
switch (config.getStorageType()) {
case EMBEDDED_KV_STORE:
LOG.info("Creating embedded rocks-db based Table View");
- return new FileSystemViewManager(conf, config,
+ return new FileSystemViewManager(context, config,
(metaClient, viewConf) -> createRocksDBBasedFileSystemView(conf,
viewConf, metaClient));
case SPILLABLE_DISK:
LOG.info("Creating Spillable Disk based Table View");
- return new FileSystemViewManager(conf, config,
+ return new FileSystemViewManager(context, config,
(metaClient, viewConf) ->
createSpillableMapBasedFileSystemView(conf, viewConf, metaClient));
case MEMORY:
LOG.info("Creating in-memory based Table View");
- return new FileSystemViewManager(conf, config,
- (metaClient, viewConfig) -> createInMemoryFileSystemView(conf,
viewConfig, metaClient));
+ return new FileSystemViewManager(context, config,
Review comment:
I don't really like overloading the in-memory based view or passing a
supplier specifically for the metadata table. but this is unavoidable, given
how much benefit there is to sharing it.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -209,15 +217,25 @@ protected BaseTableMetadata(HoodieEngineContext
engineContext, String datasetBas
// Validate the Metadata Table data by listing the partitions from the
file system
timer.startTimer();
- // Ignore partition metadata file
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
- FileStatus[] directStatuses =
metaClient.getFs().listStatus(partitionPath,
- p ->
!p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer()));
-
- List<String> directFilenames = Arrays.stream(directStatuses)
- .map(s -> s.getPath().getName()).sorted()
+ String partitionPathStr = FSUtils.getRelativePartitionPath(new
Path(datasetMetaClient.getBasePath()), partitionPath);
+ String latestDataInstantTime = getLatestDatasetInstantTime();
+ HoodieTableFileSystemView dataFsView = new
HoodieTableFileSystemView(datasetMetaClient,
datasetMetaClient.getActiveTimeline());
Review comment:
this validation stuff is actually tricky. we should do it once at the
end of each metadata sync alone. If we turn this on mid stream during a commit,
then the filesystem will report the newly written/inflight files as well, which
the metdata table does not have and validation will always fail. this is kind
of broken atm really.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -99,7 +100,11 @@ protected void initialize(HoodieEngineContext
engineContext, HoodieTableMetaClie
@Override
protected void commit(List<HoodieRecord> records, String partitionName,
String instantTime) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to
as it is not enabled");
- metadata.closeReaders();
+ try {
Review comment:
moving this to `initTableMetadata()`
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
##########
@@ -755,6 +754,9 @@ public void testMetadataOutOfSync() throws Exception {
private void validateMetadata(SparkRDDWriteClient client) throws IOException
{
HoodieWriteConfig config = client.getConfig();
+ client.close();
Review comment:
this is a hack for now. Need to clean up such that a new client is
created.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -94,31 +93,32 @@
protected final HoodieWriteConfig config;
protected final HoodieTableMetaClient metaClient;
- protected final transient HoodieEngineContext context;
protected final HoodieIndex<T, I, K, O> index;
-
private SerializableConfiguration hadoopConfiguration;
- private transient FileSystemViewManager viewManager;
- private HoodieTableMetadata metadata;
-
protected final TaskContextSupplier taskContextSupplier;
+ private final HoodieTableMetadata metadata;
+
+ private transient FileSystemViewManager viewManager;
+ protected final transient HoodieEngineContext context;
protected HoodieTable(HoodieWriteConfig config, HoodieEngineContext context,
HoodieTableMetaClient metaClient) {
this.config = config;
this.hadoopConfiguration = context.getHadoopConf();
- this.viewManager =
FileSystemViewManager.createViewManager(hadoopConfiguration,
- config.getViewStorageConfig());
+ this.context = context;
+ this.metadata = HoodieTableMetadata.create(context, config.getBasePath(),
FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR,
+ config.getMetadataConfig().useFileListingMetadata(),
config.getMetadataConfig().getFileListingMetadataVerify(),
+ false, config.getMetadataConfig().shouldAssumeDatePartitioning());
+ this.viewManager = FileSystemViewManager.createViewManager(context,
config.getMetadataConfig(), config.getViewStorageConfig(), () -> metadata);
this.metaClient = metaClient;
this.index = getIndex(config, context);
- this.context = context;
this.taskContextSupplier = context.getTaskContextSupplier();
}
protected abstract HoodieIndex<T, I, K, O> getIndex(HoodieWriteConfig
config, HoodieEngineContext context);
private synchronized FileSystemViewManager getViewManager() {
if (null == viewManager) {
- viewManager =
FileSystemViewManager.createViewManager(hadoopConfiguration,
config.getViewStorageConfig());
+ viewManager = FileSystemViewManager.createViewManager(getContext(),
config.getMetadataConfig(), config.getViewStorageConfig(), () -> metadata);
Review comment:
this is done so that, the`metadata` object is instantiated just once
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
##########
@@ -907,12 +909,12 @@ private HoodieWriteConfig getWriteConfig(boolean
autoCommit, boolean useFileList
private HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit,
boolean useFileListingMetadata, boolean enableMetrics) {
return
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2,
2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
- .withAutoCommit(autoCommit).withAssumeDatePartitioning(false)
+ .withAutoCommit(autoCommit)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024
* 1024 * 1024)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
.withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 *
1024 * 1024).build())
- .withEmbeddedTimelineServerEnabled(false).forTable("test-trip-table")
+ .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
Review comment:
should consider sometests to run with this also parameterized.
##########
File path:
hudi-client/hudi-spark-client/src/test/resources/log4j-surefire.properties
##########
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
###
-log4j.rootLogger=WARN, CONSOLE
+log4j.rootLogger=INFO, CONSOLE
Review comment:
revert
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -65,14 +67,15 @@
// Directory used for Spillable Map when merging records
protected final String spillableMapDirectory;
- private transient HoodieMetadataMergedInstantRecordScanner
timelineRecordScanner;
+ private TimelineMergedTableMetadata timelineMergedMetadata;
Review comment:
this is also serialized now, so we don't list the data timeline for
merging over and over.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -63,6 +63,12 @@
public static final String CLEANER_COMMITS_RETAINED_PROP = METADATA_PREFIX +
".cleaner.commits.retained";
public static final int DEFAULT_CLEANER_COMMITS_RETAINED = 3;
+ public static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP =
"hoodie.assume.date.partitioning";
Review comment:
consolidated everything that affects listing etc here.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -265,37 +280,28 @@ protected BaseTableMetadata(HoodieEngineContext
engineContext, String datasetBas
return mergedRecord;
}
- protected abstract Option<HoodieRecord<HoodieMetadataPayload>>
getRecordByKeyFromMetadata(String key) throws IOException;
+ protected abstract Option<HoodieRecord<HoodieMetadataPayload>>
getRecordByKeyFromMetadata(String key);
- private void openTimelineScanner() throws IOException {
- if (timelineRecordScanner != null) {
- // Already opened
- return;
+ private void openTimelineScanner() {
Review comment:
happens once on init
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -265,37 +280,28 @@ protected BaseTableMetadata(HoodieEngineContext
engineContext, String datasetBas
return mergedRecord;
}
- protected abstract Option<HoodieRecord<HoodieMetadataPayload>>
getRecordByKeyFromMetadata(String key) throws IOException;
+ protected abstract Option<HoodieRecord<HoodieMetadataPayload>>
getRecordByKeyFromMetadata(String key);
- private void openTimelineScanner() throws IOException {
- if (timelineRecordScanner != null) {
- // Already opened
- return;
+ private void openTimelineScanner() {
+ if (timelineMergedMetadata == null) {
+ List<HoodieInstant> unSyncedInstants = findInstantsToSync();
+ timelineMergedMetadata =
+ new TimelineMergedTableMetadata(datasetMetaClient, unSyncedInstants,
getSyncedInstantTime(), null);
}
-
- HoodieTableMetaClient datasetMetaClient = new
HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
- List<HoodieInstant> unSyncedInstants =
findInstantsToSync(datasetMetaClient);
- Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
- timelineRecordScanner =
- new HoodieMetadataMergedInstantRecordScanner(datasetMetaClient,
unSyncedInstants, getSyncedInstantTime(), schema, MAX_MEMORY_SIZE_IN_BYTES,
spillableMapDirectory, null);
}
- protected List<HoodieInstant> findInstantsToSync() {
- HoodieTableMetaClient datasetMetaClient = new
HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
Review comment:
no extra metaclient creations.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -98,79 +109,75 @@ public HoodieBackedTableMetadata(HoodieEngineContext
engineContext, String datas
}
@Override
- protected Option<HoodieRecord<HoodieMetadataPayload>>
getRecordByKeyFromMetadata(String key) throws IOException {
- openBaseAndLogFiles();
+ protected Option<HoodieRecord<HoodieMetadataPayload>>
getRecordByKeyFromMetadata(String key) {
+ try {
+ openBaseAndLogFiles();
+ // Retrieve record from base file
+ HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
+ if (baseFileReader != null) {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
+ if (baseRecord.isPresent()) {
+ hoodieRecord =
SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
+ metaClient.getTableConfig().getPayloadClass());
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, timer.endTimer()));
+ }
+ }
- // Retrieve record from base file
- HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
- if (baseFileReader != null) {
- HoodieTimer timer = new HoodieTimer().startTimer();
- Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
- if (baseRecord.isPresent()) {
- hoodieRecord =
SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
- metaClient.getTableConfig().getPayloadClass());
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, timer.endTimer()));
+ // Retrieve record from log file
+ Option<HoodieRecord<HoodieMetadataPayload>> logHoodieRecord =
logRecordScanner.getRecordByKey(key);
+ if (logHoodieRecord.isPresent()) {
+ if (hoodieRecord != null) {
+ // Merge the payloads
+ HoodieRecordPayload mergedPayload =
logHoodieRecord.get().getData().preCombine(hoodieRecord.getData());
+ hoodieRecord = new HoodieRecord(hoodieRecord.getKey(),
mergedPayload);
+ } else {
+ hoodieRecord = logHoodieRecord.get();
+ }
}
- }
- // Retrieve record from log file
- Option<HoodieRecord<HoodieMetadataPayload>> logHoodieRecord =
logRecordScanner.getRecordByKey(key);
- if (logHoodieRecord.isPresent()) {
- if (hoodieRecord != null) {
- // Merge the payloads
- HoodieRecordPayload mergedPayload =
logHoodieRecord.get().getData().preCombine(hoodieRecord.getData());
- hoodieRecord = new HoodieRecord(hoodieRecord.getKey(), mergedPayload);
- } else {
- hoodieRecord = logHoodieRecord.get();
+ return Option.ofNullable(hoodieRecord);
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Error merging records from metadata table
for key :" + key, ioe);
+ } finally {
+ try {
Review comment:
ugh. not nice
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -81,10 +86,16 @@ public HoodieBackedTableMetadata(Configuration conf, String
datasetBasePath, Str
public HoodieBackedTableMetadata(HoodieEngineContext engineContext, String
datasetBasePath, String spillableMapDirectory,
boolean enabled, boolean validateLookups,
boolean enableMetrics, boolean assumeDatePartitioning) {
super(engineContext, datasetBasePath, spillableMapDirectory, enabled,
validateLookups, enableMetrics, assumeDatePartitioning);
- this.metadataBasePath =
HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath);
- if (enabled) {
+ initIfNeeded();
+ }
+
+ private void initIfNeeded() {
+ if (enabled && this.metaClient == null) {
+ this.metadataBasePath =
HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath);
try {
this.metaClient = new HoodieTableMetaClient(hadoopConf.get(),
metadataBasePath);
+ HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
+ latestFileSystemMetadataSlices =
fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
Review comment:
list just once lazily
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -98,79 +109,75 @@ public HoodieBackedTableMetadata(HoodieEngineContext
engineContext, String datas
}
@Override
- protected Option<HoodieRecord<HoodieMetadataPayload>>
getRecordByKeyFromMetadata(String key) throws IOException {
- openBaseAndLogFiles();
+ protected Option<HoodieRecord<HoodieMetadataPayload>>
getRecordByKeyFromMetadata(String key) {
+ try {
+ openBaseAndLogFiles();
+ // Retrieve record from base file
+ HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
+ if (baseFileReader != null) {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
+ if (baseRecord.isPresent()) {
+ hoodieRecord =
SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
+ metaClient.getTableConfig().getPayloadClass());
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, timer.endTimer()));
+ }
+ }
- // Retrieve record from base file
- HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
- if (baseFileReader != null) {
- HoodieTimer timer = new HoodieTimer().startTimer();
- Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
- if (baseRecord.isPresent()) {
- hoodieRecord =
SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
- metaClient.getTableConfig().getPayloadClass());
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, timer.endTimer()));
+ // Retrieve record from log file
+ Option<HoodieRecord<HoodieMetadataPayload>> logHoodieRecord =
logRecordScanner.getRecordByKey(key);
+ if (logHoodieRecord.isPresent()) {
+ if (hoodieRecord != null) {
+ // Merge the payloads
+ HoodieRecordPayload mergedPayload =
logHoodieRecord.get().getData().preCombine(hoodieRecord.getData());
+ hoodieRecord = new HoodieRecord(hoodieRecord.getKey(),
mergedPayload);
+ } else {
+ hoodieRecord = logHoodieRecord.get();
+ }
}
- }
- // Retrieve record from log file
- Option<HoodieRecord<HoodieMetadataPayload>> logHoodieRecord =
logRecordScanner.getRecordByKey(key);
- if (logHoodieRecord.isPresent()) {
- if (hoodieRecord != null) {
- // Merge the payloads
- HoodieRecordPayload mergedPayload =
logHoodieRecord.get().getData().preCombine(hoodieRecord.getData());
- hoodieRecord = new HoodieRecord(hoodieRecord.getKey(), mergedPayload);
- } else {
- hoodieRecord = logHoodieRecord.get();
+ return Option.ofNullable(hoodieRecord);
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Error merging records from metadata table
for key :" + key, ioe);
+ } finally {
+ try {
+ close();
+ } catch (Exception e) {
+ throw new HoodieException("Error closing resources during metadata
table merge, for key :" + key, e);
}
}
-
- return Option.ofNullable(hoodieRecord);
}
/**
* Open readers to the base and log files.
*/
- protected synchronized void openBaseAndLogFiles() throws IOException {
- if (logRecordScanner != null) {
- // Already opened
- return;
- }
+ private synchronized void openBaseAndLogFiles() throws IOException {
HoodieTimer timer = new HoodieTimer().startTimer();
// Metadata is in sync till the latest completed instant on the dataset
- HoodieTableMetaClient datasetMetaClient = new
HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
- String latestInstantTime =
datasetMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant()
- .map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
- // Find the latest file slice
Review comment:
all these done only once on init
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
##########
@@ -36,9 +36,10 @@
private final HoodieTableMetadata tableMetadata;
Review comment:
this instance is not closed. but its not the responsibilty of this
object
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]