the-other-tim-brown commented on code in PR #13402:
URL: https://github.com/apache/hudi/pull/13402#discussion_r2145775792
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -317,33 +318,40 @@ protected HoodieWriteMetadata<O> compact(HoodieTable<?,
I, ?, T> table, String c
}
compactionTimer = metrics.getCompactionCtx();
HoodieWriteMetadata<T> writeMetadata = table.compact(context,
compactionInstantTime);
- HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(writeMetadata);
+ HoodieWriteMetadata<T> processedWriteMetadata =
processWriteMetadata(table, writeMetadata, compactionInstantTime);
+ HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(processedWriteMetadata);
if (shouldComplete) {
commitCompaction(compactionInstantTime, compactionWriteMetadata,
Option.of(table));
}
return compactionWriteMetadata;
}
+ protected HoodieWriteMetadata<T> processWriteMetadata(HoodieTable table,
HoodieWriteMetadata<T> writeMetadata, String instantTime) {
+ return writeMetadata;
+ }
+
public void commitCompaction(String compactionInstantTime,
HoodieWriteMetadata<O> compactionWriteMetadata, Option<HoodieTable> tableOpt) {
// dereferencing the write dag for compaction for the first time.
- List<HoodieWriteStat> writeStats =
triggerWritesAndFetchWriteStats(compactionWriteMetadata);
+ Pair<List<HoodieWriteStat>, List<HoodieWriteStat>>
dataTableAndMetadataTableHoodieWriteStats =
triggerWritesAndFetchWriteStats(compactionWriteMetadata);
// Fetch commit metadata from HoodieWriteMetadata and update
HoodieWriteStat
-
CommonClientUtils.stitchCompactionHoodieWriteStats(compactionWriteMetadata,
writeStats);
+
CommonClientUtils.stitchCompactionHoodieWriteStats(compactionWriteMetadata,
dataTableAndMetadataTableHoodieWriteStats.getKey());
metrics.emitCompactionCompleted();
HoodieTable table = tableOpt.orElseGet(() -> createTable(config,
context.getStorageConf()));
- completeCompaction(compactionWriteMetadata.getCommitMetadata().get(),
table, compactionInstantTime);
+ completeCompaction(compactionWriteMetadata.getCommitMetadata().get(),
table, compactionInstantTime,
dataTableAndMetadataTableHoodieWriteStats.getValue());
}
/**
* The API triggers the data write and fetches the corresponding write stats
using the write metadata.
+ * When streaming writes to metadata table is enabled, writes to metadata
table is expected to be triggered here and the List of {@link HoodieWriteStat}
to be returned
+ * as part of this call.
*/
- protected abstract List<HoodieWriteStat>
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<O> writeMetadata);
+ protected abstract Pair<List<HoodieWriteStat>, List<HoodieWriteStat>>
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<O> writeMetadata);
/**
* Commit Compaction and track metrics.
*/
- protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable
table, String compactionCommitTime) {
+ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable
table, String compactionCommitTime, List<HoodieWriteStat>
metadataWriteStatsSoFar) {
Review Comment:
nit on naming: `metadataWriteStatsSoFar` could be
`currentMetadataWriteStats`?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1185,6 +1203,13 @@ public HoodieWriteMetadata<O> logCompact(String
logCompactionInstantTime) {
return logCompact(logCompactionInstantTime, false);
}
+ /**
+ * Commit Log Compaction and track metrics.
+ **/
+ protected void completeLogCompaction(HoodieCommitMetadata metadata,
HoodieTable table, String logCompactionCommitTime) {
Review Comment:
This method is unused, is it still required?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieMetadataWriteWrapper.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Abstraction for data table write client and table service client to write
to metadata table.
+ */
+public class HoodieMetadataWriteWrapper {
Review Comment:
I think we can come up with a better name than `Wrapper`, maybe something
that indicates this is for streaming writes? `StreamingMetadataWriteHandler` or
something similar?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java:
##########
@@ -36,17 +39,58 @@
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
+import java.util.stream.Collectors;
public class SparkRDDTableServiceClient<T> extends
BaseHoodieTableServiceClient<HoodieData<HoodieRecord<T>>,
HoodieData<WriteStatus>, JavaRDD<WriteStatus>> {
+
+ private HoodieMetadataWriteWrapper metadataWriterWrapper = new
HoodieMetadataWriteWrapper();
protected SparkRDDTableServiceClient(HoodieEngineContext context,
HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService>
timelineService) {
super(context, clientConfig, timelineService);
}
@Override
- protected List<HoodieWriteStat>
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<JavaRDD<WriteStatus>>
writeMetadata) {
- return writeMetadata.getWriteStatuses().map(writeStatus ->
writeStatus.getStat()).collect();
+ protected Pair<List<HoodieWriteStat>, List<HoodieWriteStat>>
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<JavaRDD<WriteStatus>>
tableServiceWriteMetadata) {
+ // Triggering the dag for writes.
+ // If streaming writes are enabled, writes to both data table and metadata
table gets triggered at this juncture.
+ // If not, writes to data table gets triggered here.
+ // When streaming writes are enabled, data table's WriteStatus is expected
to contain all stats required to generate metadata table records and so each
object will be larger.
+ // So, here we are dropping all additional stats and error records to
retain only the required information and prevent collecting large objects on
the driver.
+ List<SparkRDDWriteClient.SlimWriteStats> writeStatusMetadataTrackerList =
tableServiceWriteMetadata.getWriteStatuses()
+ .map(writeStatus -> new
SparkRDDWriteClient.SlimWriteStats(writeStatus.isMetadataTable(),
writeStatus.getTotalRecords(), writeStatus.getTotalErrorRecords(),
Review Comment:
This is similar to the code in SparkRDDWriteClient. We can expose a static
method on `SlimWriteStats` like `static List<SlimWriteStats>
from(JavaRDD<WriteStatus> writeStatuses)` to keep the code consistent between
the two clients.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1539,6 +1543,7 @@ protected void commitInternal(String instantTime,
Map<String, HoodieData<HoodieR
Pair<HoodieData<HoodieRecord>, List<HoodieFileGroupId>> result =
tagRecordsWithLocation(partitionRecordsMap, isInitializing);
HoodieData<HoodieRecord> preppedRecords = result.getKey();
I preppedRecordInputs =
convertHoodieDataToEngineSpecificData(preppedRecords);
+ List<HoodieFileGroupId> updatedMDTFileGroupIds = result.getValue();
Review Comment:
This is unused, is that intentional?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java:
##########
@@ -36,17 +39,58 @@
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
+import java.util.stream.Collectors;
public class SparkRDDTableServiceClient<T> extends
BaseHoodieTableServiceClient<HoodieData<HoodieRecord<T>>,
HoodieData<WriteStatus>, JavaRDD<WriteStatus>> {
+
+ private HoodieMetadataWriteWrapper metadataWriterWrapper = new
HoodieMetadataWriteWrapper();
protected SparkRDDTableServiceClient(HoodieEngineContext context,
HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService>
timelineService) {
super(context, clientConfig, timelineService);
}
@Override
- protected List<HoodieWriteStat>
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<JavaRDD<WriteStatus>>
writeMetadata) {
- return writeMetadata.getWriteStatuses().map(writeStatus ->
writeStatus.getStat()).collect();
+ protected Pair<List<HoodieWriteStat>, List<HoodieWriteStat>>
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<JavaRDD<WriteStatus>>
tableServiceWriteMetadata) {
+ // Triggering the dag for writes.
+ // If streaming writes are enabled, writes to both data table and metadata
table gets triggered at this juncture.
+ // If not, writes to data table gets triggered here.
+ // When streaming writes are enabled, data table's WriteStatus is expected
to contain all stats required to generate metadata table records and so each
object will be larger.
+ // So, here we are dropping all additional stats and error records to
retain only the required information and prevent collecting large objects on
the driver.
+ List<SparkRDDWriteClient.SlimWriteStats> writeStatusMetadataTrackerList =
tableServiceWriteMetadata.getWriteStatuses()
+ .map(writeStatus -> new
SparkRDDWriteClient.SlimWriteStats(writeStatus.isMetadataTable(),
writeStatus.getTotalRecords(), writeStatus.getTotalErrorRecords(),
+ writeStatus.getStat())).collect();
+
+ List<HoodieWriteStat> dataTableWriteStats =
writeStatusMetadataTrackerList.stream().filter(entry ->
!entry.isMetadataTable()).map(entry ->
entry.getWriteStat()).collect(Collectors.toList());
+ List<HoodieWriteStat> mdtWriteStats =
writeStatusMetadataTrackerList.stream().filter(entry ->
entry.isMetadataTable()).map(entry ->
entry.getWriteStat()).collect(Collectors.toList());
+
+ if (isMetadataTable) {
+ ValidationUtils.checkArgument(dataTableWriteStats.isEmpty(), "For
Metadata table,"
+ + "we do not expect any writes having WriteStatus referring to data
table. ");
Review Comment:
Nitpick: trim trailing space
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieMetadataWriteWrapper.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Abstraction for data table write client and table service client to write
to metadata table.
+ */
+public class HoodieMetadataWriteWrapper {
+
+ // Cached HoodieTableMetadataWriter for each action in data table. This will
be cleaned up when action is completed or when write client is closed.
+ protected Map<String, Option<HoodieTableMetadataWriter>> metadataWriterMap =
new ConcurrentHashMap<>();
Review Comment:
can this be `private final`? Is it expected to undergo concurrent
operations? if not, let's use a hashmap
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieMetadataWriteWrapper.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Abstraction for data table write client and table service client to write
to metadata table.
+ */
+public class HoodieMetadataWriteWrapper {
+
+ // Cached HoodieTableMetadataWriter for each action in data table. This will
be cleaned up when action is completed or when write client is closed.
+ protected Map<String, Option<HoodieTableMetadataWriter>> metadataWriterMap =
new ConcurrentHashMap<>();
+
+ /**
+ * Called by data table write client and data table table service client to
perform streaming write to metadata table.
+ * @param table {@link HoodieTable} instance for data table of interest.
+ * @param dataTableWriteStatuses {@link WriteStatus} from data table writes.
+ * @param instantTime instant time of interest.
Review Comment:
is this the instant time in the metadata table or data table?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java:
##########
@@ -36,17 +39,58 @@
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
+import java.util.stream.Collectors;
public class SparkRDDTableServiceClient<T> extends
BaseHoodieTableServiceClient<HoodieData<HoodieRecord<T>>,
HoodieData<WriteStatus>, JavaRDD<WriteStatus>> {
+
+ private HoodieMetadataWriteWrapper metadataWriterWrapper = new
HoodieMetadataWriteWrapper();
Review Comment:
make this `final`?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -270,7 +269,7 @@ protected void init(String fileId,
Iterator<HoodieRecord<T>> newRecordsItr) {
// update the new location of the record, so we know where to find it
next
if (needsUpdateLocation()) {
record.unseal();
- record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
+ record.setNewLocation(newRecordLocation);
Review Comment:
the `fileId` input to the method is unused, should it be removed now?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -70,6 +70,7 @@ public class SparkRDDWriteClient<T> extends
BaseHoodieWriteClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> {
private static final Logger LOG =
LoggerFactory.getLogger(SparkRDDWriteClient.class);
+ private HoodieMetadataWriteWrapper metadataWriteWrapper = new
HoodieMetadataWriteWrapper();
Review Comment:
make this `final` as well?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java:
##########
@@ -36,17 +39,58 @@
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
+import java.util.stream.Collectors;
public class SparkRDDTableServiceClient<T> extends
BaseHoodieTableServiceClient<HoodieData<HoodieRecord<T>>,
HoodieData<WriteStatus>, JavaRDD<WriteStatus>> {
+
+ private HoodieMetadataWriteWrapper metadataWriterWrapper = new
HoodieMetadataWriteWrapper();
protected SparkRDDTableServiceClient(HoodieEngineContext context,
HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService>
timelineService) {
super(context, clientConfig, timelineService);
}
@Override
- protected List<HoodieWriteStat>
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<JavaRDD<WriteStatus>>
writeMetadata) {
- return writeMetadata.getWriteStatuses().map(writeStatus ->
writeStatus.getStat()).collect();
+ protected Pair<List<HoodieWriteStat>, List<HoodieWriteStat>>
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<JavaRDD<WriteStatus>>
tableServiceWriteMetadata) {
+ // Triggering the dag for writes.
+ // If streaming writes are enabled, writes to both data table and metadata
table gets triggered at this juncture.
+ // If not, writes to data table gets triggered here.
+ // When streaming writes are enabled, data table's WriteStatus is expected
to contain all stats required to generate metadata table records and so each
object will be larger.
+ // So, here we are dropping all additional stats and error records to
retain only the required information and prevent collecting large objects on
the driver.
+ List<SparkRDDWriteClient.SlimWriteStats> writeStatusMetadataTrackerList =
tableServiceWriteMetadata.getWriteStatuses()
+ .map(writeStatus -> new
SparkRDDWriteClient.SlimWriteStats(writeStatus.isMetadataTable(),
writeStatus.getTotalRecords(), writeStatus.getTotalErrorRecords(),
+ writeStatus.getStat())).collect();
+
+ List<HoodieWriteStat> dataTableWriteStats =
writeStatusMetadataTrackerList.stream().filter(entry ->
!entry.isMetadataTable()).map(entry ->
entry.getWriteStat()).collect(Collectors.toList());
+ List<HoodieWriteStat> mdtWriteStats =
writeStatusMetadataTrackerList.stream().filter(entry ->
entry.isMetadataTable()).map(entry ->
entry.getWriteStat()).collect(Collectors.toList());
+
+ if (isMetadataTable) {
+ ValidationUtils.checkArgument(dataTableWriteStats.isEmpty(), "For
Metadata table,"
+ + "we do not expect any writes having WriteStatus referring to data
table. ");
+ dataTableWriteStats.clear();
+ dataTableWriteStats.addAll(mdtWriteStats);
+ mdtWriteStats.clear();
+ }
+ return Pair.of(dataTableWriteStats, mdtWriteStats);
+ }
+
+ @Override
+ protected HoodieWriteMetadata<HoodieData<WriteStatus>>
processWriteMetadata(HoodieTable table,
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata, String instantTime)
{
+ if (!isMetadataTable && config.isMetadataTableEnabled() &&
config.isMetadataStreamingWritesEnabled(table.getMetaClient().getTableConfig().getTableVersion()))
{
Review Comment:
This condition is the same as line 87 and I think there are similar checks
in the SparkRDDWriteClient, is there a common place to put this logic to keep
it consistent?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]