alexeykudinkin commented on code in PR #7841:
URL: https://github.com/apache/hudi/pull/7841#discussion_r1096285862
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1086,39 +1086,45 @@ private void initialCommit(String createInstantTime,
List<MetadataPartitionType>
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap
= new HashMap<>();
- List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient);
- Map<String, Map<String, Long>> partitionToFilesMap =
partitionInfoList.stream()
- .map(p -> {
- String partitionName =
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
- return Pair.of(partitionName, p.getFileNameToSizeMap());
- })
- .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
- int totalDataFilesCount =
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
- List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
- if (partitionTypes.contains(MetadataPartitionType.FILES)) {
- // Record which saves the list of all partitions
- HoodieRecord allPartitionRecord =
HoodieMetadataPayload.createPartitionListRecord(partitions);
- HoodieData<HoodieRecord> filesPartitionRecords =
getFilesPartitionRecords(createInstantTime, partitionInfoList,
allPartitionRecord);
- ValidationUtils.checkState(filesPartitionRecords.count() ==
(partitions.size() + 1));
- partitionToRecordsMap.put(MetadataPartitionType.FILES,
filesPartitionRecords);
- }
+ // skip file system listing to populate metadata records if its a fresh
table.
+ // this is applicable only if the table already has N commits and metadata
is enabled at a later point in time.
+ if (!createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { //
SOLO_COMMIT_TIMESTAMP will be the intial commit time in MDT for a fresh table.
Review Comment:
You just wrapped this into a conditional, no changes to the branch itself,
right?
##########
hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java:
##########
@@ -61,13 +60,15 @@ public HoodieDataSourceInternalWriter(String instantTime,
HoodieWriteConfig writ
this.structType = structType;
this.populateMetaFields = populateMetaFields;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
- this.extraMetadataMap =
DataSourceUtils.getExtraMetadata(dataSourceOptions.asMap());
+ this.sparkSession = sparkSession;
+ Map<String, String> extraMetadataMap =
DataSourceUtils.getExtraMetadata(dataSourceOptions.asMap());
this.dataSourceInternalWriterHelper = new
DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
sparkSession, configuration, extraMetadataMap);
}
@Override
public DataWriterFactory<InternalRow> createWriterFactory() {
+ sparkSession.sparkContext().setJobGroup(this.getClass().getSimpleName(),
"Writing data to files using bulk_insert", true);
Review Comment:
Let's avoid setting misleading annotations -- this method is just creating
the factory but not actually writing any data until later point in time.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -280,6 +280,7 @@ protected void commit(HoodieTable table, String
commitActionType, String instant
}
// update Metadata table
writeTableMetadata(table, instantTime, commitActionType, metadata);
+ context.setJobStatus(this.getClass().getSimpleName(),"Completing commit "
+ instantTime + " in table " + config.getTableName());
Review Comment:
What do we need this for? There's no Spark job being triggered at that point?
##########
hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java:
##########
@@ -51,7 +50,7 @@ public class HoodieDataSourceInternalWriter implements
DataSourceWriter {
private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper;
private final boolean populateMetaFields;
private final Boolean arePartitionRecordsSorted;
- private Map<String, String> extraMetadataMap = new HashMap<>();
+ private final SparkSession sparkSession;
Review Comment:
Writer will be passed on to executor, you can't add a Spark Session in here
##########
hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java:
##########
@@ -89,6 +90,7 @@ public void onDataWriterCommit(WriterCommitMessage message) {
@Override
public void commit(WriterCommitMessage[] messages) {
+ sparkSession.sparkContext().setJobGroup(this.getClass().getSimpleName(),
"Committing to data table", true);
Review Comment:
Same comment: what's this annotating? There's no Spark jobs being executed
at this point
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1086,39 +1086,45 @@ private void initialCommit(String createInstantTime,
List<MetadataPartitionType>
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap
= new HashMap<>();
- List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient);
- Map<String, Map<String, Long>> partitionToFilesMap =
partitionInfoList.stream()
- .map(p -> {
- String partitionName =
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
- return Pair.of(partitionName, p.getFileNameToSizeMap());
- })
- .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
- int totalDataFilesCount =
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
- List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
- if (partitionTypes.contains(MetadataPartitionType.FILES)) {
- // Record which saves the list of all partitions
- HoodieRecord allPartitionRecord =
HoodieMetadataPayload.createPartitionListRecord(partitions);
- HoodieData<HoodieRecord> filesPartitionRecords =
getFilesPartitionRecords(createInstantTime, partitionInfoList,
allPartitionRecord);
- ValidationUtils.checkState(filesPartitionRecords.count() ==
(partitions.size() + 1));
- partitionToRecordsMap.put(MetadataPartitionType.FILES,
filesPartitionRecords);
- }
+ // skip file system listing to populate metadata records if its a fresh
table.
+ // this is applicable only if the table already has N commits and metadata
is enabled at a later point in time.
+ if (!createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { //
SOLO_COMMIT_TIMESTAMP will be the intial commit time in MDT for a fresh table.
Review Comment:
Let's also invert conditional (to avoid negation, put smaller block first)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]