alexeykudinkin commented on a change in pull request #4789:
URL: https://github.com/apache/hudi/pull/4789#discussion_r813173761
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -526,6 +526,7 @@ private boolean hasOperationField() {
Schema tableAvroSchema = getTableAvroSchemaFromDataFile();
return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD)
!= null;
} catch (Exception e) {
+ LOG.info(String.format("Failed to read operation field from avro schema
(%s)", e.getMessage()));
Review comment:
I would rather reserve "error" level for things that flag malfunction
and as such require immediate attention. This is just referring that we're
trying to fetch particular failed that wasn't found
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
##########
@@ -32,10 +32,14 @@
public ComplexAvroKeyGenerator(TypedProperties props) {
super(props);
- this.recordKeyFields =
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
- .split(",")).map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toList());
- this.partitionPathFields =
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
- .split(",")).map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toList());
+ this.recordKeyFields =
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList());
+ this.partitionPathFields =
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList());
Review comment:
Good call. This was actually changed originally to compensate for
missing config property (which was previously stubbed w/ "" which didn't make
sense) but then i had to rollback this change selectively since it was
backfiring in other place, and that's how this has turned out to be here
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -314,35 +309,65 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
val historyAvroRecord =
serializer.serialize(curRow).asInstanceOf[GenericRecord]
- logRecords.get(curKey).getData.combineAndGetUpdateValue(
- historyAvroRecord, tableAvroSchema, payloadProps)
+ logRecords.get(curKey).getData
+ .combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema,
payloadProps)
}
}
- }
+}
private object HoodieMergeOnReadRDD {
val CONFIG_INSTANTIATION_LOCK = new Object()
def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config:
Configuration): HoodieMergedLogRecordScanner = {
val fs = FSUtils.getFs(split.tablePath, config)
- HoodieMergedLogRecordScanner.newBuilder()
- .withFileSystem(fs)
- .withBasePath(split.tablePath)
- .withLogFilePaths(split.logPaths.get.asJava)
- .withReaderSchema(logSchema)
- .withLatestInstantTime(split.latestCommit)
- .withReadBlocksLazily(
-
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
- .getOrElse(false))
- .withReverseReader(false)
- .withBufferSize(
- config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
- HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
- .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
- .withSpillableMapBasePath(
- config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
- HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
- .build()
+
+ if (HoodieTableMetadata.isMetadataTable(split.tablePath)) {
+ val metadataConfig =
HoodieMetadataConfig.newBuilder().enable(true).build()
+ val dataTableBasePath =
getDataTableBasePathFromMetadataTable(split.tablePath)
+ val metadataTable = new HoodieBackedTableMetadata(
+ new HoodieLocalEngineContext(config), metadataConfig,
+ dataTableBasePath,
+ config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+
+ // NOTE: In case of Metadata Table partition path equates to partition
name (since there's just one level
+ // of indirection among MT partitions)
+ val relativePartitionPath = FSUtils.getRelativePartitionPath(new
Path(split.tablePath), getPartitionPath(split))
+ metadataTable.getLogRecordScanner(split.logFiles.get.asJava,
relativePartitionPath).getLeft
+ } else {
+ HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(split.tablePath)
+ .withLogFilePaths(split.logFiles.get.map(logFile =>
getFilePath(logFile.getPath)).asJava)
Review comment:
Not sure i understand your Q -- this is requiring exactly log-paths, but
am propagating `HoodieLogFile` as part of the split since it's necessary to be
able to read MT records (line 335)
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -314,35 +309,65 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
val historyAvroRecord =
serializer.serialize(curRow).asInstanceOf[GenericRecord]
- logRecords.get(curKey).getData.combineAndGetUpdateValue(
- historyAvroRecord, tableAvroSchema, payloadProps)
+ logRecords.get(curKey).getData
+ .combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema,
payloadProps)
}
}
- }
+}
private object HoodieMergeOnReadRDD {
val CONFIG_INSTANTIATION_LOCK = new Object()
def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config:
Configuration): HoodieMergedLogRecordScanner = {
val fs = FSUtils.getFs(split.tablePath, config)
- HoodieMergedLogRecordScanner.newBuilder()
- .withFileSystem(fs)
- .withBasePath(split.tablePath)
- .withLogFilePaths(split.logPaths.get.asJava)
- .withReaderSchema(logSchema)
- .withLatestInstantTime(split.latestCommit)
- .withReadBlocksLazily(
-
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
- .getOrElse(false))
- .withReverseReader(false)
- .withBufferSize(
- config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
- HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
- .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
- .withSpillableMapBasePath(
- config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
- HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
- .build()
+
+ if (HoodieTableMetadata.isMetadataTable(split.tablePath)) {
+ val metadataConfig =
HoodieMetadataConfig.newBuilder().enable(true).build()
Review comment:
Sorry, folks, not sure i understand what your point is: it's already
init'd w/ mostly defaults.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]