vinothchandar commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r617739216



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
##########
@@ -96,9 +97,14 @@ public HoodieInsertValueGenResult(T record, Schema schema) {
    * Transformer function to help transform a HoodieRecord. This transformer 
is used by BufferedIterator to offload some
    * expensive operations of transformation to the reader thread.
    */
+  static <T extends HoodieRecordPayload> Function<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>> getTransformFunction(
+      Schema schema, HoodieWriteConfig config) {
+    return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, 
schema, config.getProps());
+  }
+
   static <T extends HoodieRecordPayload> Function<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>> getTransformFunction(
       Schema schema) {
-    return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, 
schema);
+    return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, 
schema, new Properties());

Review comment:
       this is a function invoked per record. so could we avoid the empty `new 
Properties` allocation each time, to reduce overhead on each record

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -162,6 +162,8 @@
       "hoodie.write.meta.key.prefixes";
   public static final String DEFAULT_WRITE_META_KEY_PREFIXES = "";
 
+  public static final String WRITE_SCHEMA = "hoodie.write.schema";

Review comment:
       please add some javadocs ?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
##########
@@ -63,14 +62,14 @@
 
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
                             String partitionPath, String fileId, 
TaskContextSupplier taskContextSupplier) {
-    this(config, instantTime, hoodieTable, partitionPath, fileId, 
getWriterSchemaIncludingAndExcludingMetadataPair(config),
+    this(config, instantTime, hoodieTable, partitionPath, fileId, 
Option.empty(),
         taskContextSupplier);
   }
 
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
-                            String partitionPath, String fileId, Pair<Schema, 
Schema> writerSchemaIncludingAndExcludingMetadataPair,
+                            String partitionPath, String fileId, 
Option<Schema> specifySchema,

Review comment:
       do you intend `specifiedSchema`? Btw is `specifySchema` same as 
`writeSchema`? Good to avoid new terminology

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -189,8 +189,12 @@ protected boolean isUpdateRecord(HoodieRecord<T> 
hoodieRecord) {
   private Option<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) 
{
     Option<Map<String, String>> recordMetadata = 
hoodieRecord.getData().getMetadata();
     try {
-      Option<IndexedRecord> avroRecord = 
hoodieRecord.getData().getInsertValue(writerSchema);
+      Option<IndexedRecord> avroRecord = 
hoodieRecord.getData().getInsertValue(inputSchema,
+          config.getProps());
       if (avroRecord.isPresent()) {
+        if (avroRecord.get() == HoodieMergeHandle.IGNORE_RECORD) {

Review comment:
       lets move `IGNORE_RECORD` to the base class `HoodieWriteHandle` ? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
##########
@@ -96,9 +97,14 @@ public HoodieInsertValueGenResult(T record, Schema schema) {
    * Transformer function to help transform a HoodieRecord. This transformer 
is used by BufferedIterator to offload some
    * expensive operations of transformation to the reader thread.
    */
+  static <T extends HoodieRecordPayload> Function<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>> getTransformFunction(
+      Schema schema, HoodieWriteConfig config) {
+    return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, 
schema, config.getProps());

Review comment:
       I think `config.getProps()` will not involve any new allocations of 
properties object or copying of values. good to double confirm. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
##########
@@ -54,9 +53,18 @@
 public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, 
O> extends HoodieIOHandle<T, I, K, O> {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieWriteHandle.class);
+  /**
+   * The input schema of the incoming dataframe.
+   */
+  protected final Schema inputSchema;

Review comment:
       This part is still not clear to me, as I left the comment above. if you 
don't mind, happy to push a commit to make naming consistent myself :). Thanks 
for the patience!

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -189,8 +189,12 @@ protected boolean isUpdateRecord(HoodieRecord<T> 
hoodieRecord) {
   private Option<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) 
{
     Option<Map<String, String>> recordMetadata = 
hoodieRecord.getData().getMetadata();
     try {
-      Option<IndexedRecord> avroRecord = 
hoodieRecord.getData().getInsertValue(writerSchema);
+      Option<IndexedRecord> avroRecord = 
hoodieRecord.getData().getInsertValue(inputSchema,
+          config.getProps());
       if (avroRecord.isPresent()) {
+        if (avroRecord.get() == HoodieMergeHandle.IGNORE_RECORD) {

Review comment:
       Do we need a `.equals()` instead? with shuffles and data exchanged the 
object reference could change?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -192,7 +193,7 @@ protected void initializeIncomingRecordsMap() {
       long memoryForMerge = 
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps());
       LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
       this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, 
config.getSpillableMapBasePath(),
-              new DefaultSizeEstimator(), new 
HoodieRecordSizeEstimator(writerSchema));
+          new DefaultSizeEstimator(), new 
HoodieRecordSizeEstimator(inputSchema));

Review comment:
       sorry to repeat this again. why `inputSchema`? and whats difference 
between this and `writeSchema` exactly? I think `inputSchema` is basically 
whats specified via `hoodie.write.schema`? if so can we call this 
`suppliedTableSchema`. Its actually confusing to read input vs table vs write 
schema. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
##########
@@ -154,9 +157,9 @@ public void write() {
         final String key = keyIterator.next();
         HoodieRecord<T> record = recordMap.get(key);
         if (useWriterSchema) {
-          write(record, 
record.getData().getInsertValue(writerSchemaWithMetafields));
+          write(record, 
record.getData().getInsertValue(inputSchemaWithMetaFields, config.getProps()));

Review comment:
       inputSchema == writeSchema? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -278,11 +279,17 @@ public void write(GenericRecord oldRecord) {
       HoodieRecord<T> hoodieRecord = new 
HoodieRecord<>(keyToNewRecords.get(key));
       try {
         Option<IndexedRecord> combinedAvroRecord =
-            hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, 
useWriterSchema ? writerSchemaWithMetafields : writerSchema,
+            hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
+              useWriterSchema ? inputSchemaWithMetaFields : inputSchema,

Review comment:
       IIUC we do this so that the records keep getting written with the full 
table schema. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
##########
@@ -54,9 +53,30 @@
 public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, 
O> extends HoodieIOHandle<T, I, K, O> {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieWriteHandle.class);
+  /**
+   * A special record returned by HoodieRecordPayload which will should be 
skip by the write

Review comment:
       some more context on when this is emitted and why would help understand. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -174,7 +174,8 @@ public boolean commitStats(String instantTime, 
List<HoodieWriteStat> stats, Opti
                              String commitActionType, Map<String, 
List<String>> partitionToReplaceFileIds) {
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTable table = createTable(config, hadoopConf);
-    HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, 
partitionToReplaceFileIds, extraMetadata, operationType, config.getSchema(), 
commitActionType);
+    HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, 
partitionToReplaceFileIds,
+        extraMetadata, operationType, config.getWriteSchema(), 
commitActionType);

Review comment:
       so switching to `config.getWriteSchema()` here, would affect hive sync? 
We use the schema from the commit file to sync to hive. So if the write schema 
is a subset of the table schema, then we can have an issue here. Did you run 
into any issues like that? I think we could actually write both into the commit 
metadata.?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to