xushiyan commented on a change in pull request #4789:
URL: https://github.com/apache/hudi/pull/4789#discussion_r812857595



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
##########
@@ -32,10 +32,14 @@
 
   public ComplexAvroKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
-        .split(",")).map(String::trim).filter(s -> 
!s.isEmpty()).collect(Collectors.toList());
-    this.partitionPathFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
-        .split(",")).map(String::trim).filter(s -> 
!s.isEmpty()).collect(Collectors.toList());
+    this.recordKeyFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
+        .map(String::trim)
+        .filter(s -> !s.isEmpty())
+        .collect(Collectors.toList());
+    this.partitionPathFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(","))
+        .map(String::trim)
+        .filter(s -> !s.isEmpty())
+        .collect(Collectors.toList());

Review comment:
       @alexeykudinkin this is a bit hard for reviewers to tell if there is any 
logic change or just style change. let's try keep style changes out from big 
PRs like this. Once we got spotless to help standardize all formats 
(https://issues.apache.org/jira/browse/HUDI-1237), we can forget about 
formatting manually and always leave to the formatter.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
##########
@@ -37,10 +37,14 @@
 
   public ComplexKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
-        .split(",")).map(String::trim).filter(s -> 
!s.isEmpty()).collect(Collectors.toList());
-    this.partitionPathFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
-        .split(",")).map(String::trim).filter(s -> 
!s.isEmpty()).collect(Collectors.toList());
+    this.recordKeyFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
+        .map(String::trim)
+        .filter(s -> !s.isEmpty())
+        .collect(Collectors.toList());
+    this.partitionPathFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(","))
+        .map(String::trim)
+        .filter(s -> !s.isEmpty())
+        .collect(Collectors.toList());

Review comment:
       ditto

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -314,35 +309,65 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
 
       private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
         val historyAvroRecord = 
serializer.serialize(curRow).asInstanceOf[GenericRecord]
-        logRecords.get(curKey).getData.combineAndGetUpdateValue(
-          historyAvroRecord, tableAvroSchema, payloadProps)
+        logRecords.get(curKey).getData
+          .combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema, 
payloadProps)
       }
     }
-  }
+}
 
 private object HoodieMergeOnReadRDD {
   val CONFIG_INSTANTIATION_LOCK = new Object()
 
   def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: 
Configuration): HoodieMergedLogRecordScanner = {
     val fs = FSUtils.getFs(split.tablePath, config)
-    HoodieMergedLogRecordScanner.newBuilder()
-      .withFileSystem(fs)
-      .withBasePath(split.tablePath)
-      .withLogFilePaths(split.logPaths.get.asJava)
-      .withReaderSchema(logSchema)
-      .withLatestInstantTime(split.latestCommit)
-      .withReadBlocksLazily(
-        
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-          
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
-          .getOrElse(false))
-      .withReverseReader(false)
-      .withBufferSize(
-        config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
-          HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
-      .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
-      .withSpillableMapBasePath(
-        config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
-          HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
-      .build()
+
+    if (HoodieTableMetadata.isMetadataTable(split.tablePath)) {
+      val metadataConfig = 
HoodieMetadataConfig.newBuilder().enable(true).build()

Review comment:
       just init `metadataConfig` with everything defaults from 
HoodieMetadataConfig here?

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -314,35 +309,65 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
 
       private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
         val historyAvroRecord = 
serializer.serialize(curRow).asInstanceOf[GenericRecord]
-        logRecords.get(curKey).getData.combineAndGetUpdateValue(
-          historyAvroRecord, tableAvroSchema, payloadProps)
+        logRecords.get(curKey).getData
+          .combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema, 
payloadProps)
       }
     }
-  }
+}
 
 private object HoodieMergeOnReadRDD {
   val CONFIG_INSTANTIATION_LOCK = new Object()
 
   def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: 
Configuration): HoodieMergedLogRecordScanner = {
     val fs = FSUtils.getFs(split.tablePath, config)
-    HoodieMergedLogRecordScanner.newBuilder()
-      .withFileSystem(fs)
-      .withBasePath(split.tablePath)
-      .withLogFilePaths(split.logPaths.get.asJava)
-      .withReaderSchema(logSchema)
-      .withLatestInstantTime(split.latestCommit)
-      .withReadBlocksLazily(
-        
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-          
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
-          .getOrElse(false))
-      .withReverseReader(false)
-      .withBufferSize(
-        config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
-          HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
-      .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
-      .withSpillableMapBasePath(
-        config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
-          HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
-      .build()
+
+    if (HoodieTableMetadata.isMetadataTable(split.tablePath)) {
+      val metadataConfig = 
HoodieMetadataConfig.newBuilder().enable(true).build()
+      val dataTableBasePath = 
getDataTableBasePathFromMetadataTable(split.tablePath)
+      val metadataTable = new HoodieBackedTableMetadata(
+        new HoodieLocalEngineContext(config), metadataConfig,
+        dataTableBasePath,
+        config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, 
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+
+      // NOTE: In case of Metadata Table partition path equates to partition 
name (since there's just one level
+      //       of indirection among MT partitions)
+      val relativePartitionPath = FSUtils.getRelativePartitionPath(new 
Path(split.tablePath), getPartitionPath(split))
+      metadataTable.getLogRecordScanner(split.logFiles.get.asJava, 
relativePartitionPath).getLeft
+    } else {
+      HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(split.tablePath)
+        .withLogFilePaths(split.logFiles.get.map(logFile => 
getFilePath(logFile.getPath)).asJava)

Review comment:
       can you help me understand why `logPaths` can't be used here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to