alexeykudinkin commented on code in PR #5523:
URL: https://github.com/apache/hudi/pull/5523#discussion_r923729365


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -57,187 +92,153 @@ public int numFields() {
   }
 
   @Override
-  public void setNullAt(int i) {
-    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      switch (i) {
-        case 0: {
-          this.commitTime = null;
-          break;
-        }
-        case 1: {
-          this.commitSeqNumber = null;
-          break;
-        }
-        case 2: {
-          this.recordKey = null;
-          break;
-        }
-        case 3: {
-          this.partitionPath = null;
-          break;
-        }
-        case 4: {
-          this.fileName = null;
-          break;
-        }
-        default: throw new IllegalArgumentException("Not expected");
-      }
+  public void setNullAt(int ordinal) {
+    if (ordinal < metaFields.length) {
+      metaFields[ordinal] = null;
     } else {
-      row.setNullAt(i);
+      row.setNullAt(rebaseOrdinal(ordinal));
     }
   }
 
   @Override
-  public void update(int i, Object value) {
-    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      switch (i) {
-        case 0: {
-          this.commitTime = value.toString();
-          break;
-        }
-        case 1: {
-          this.commitSeqNumber = value.toString();
-          break;
-        }
-        case 2: {
-          this.recordKey = value.toString();
-          break;
-        }
-        case 3: {
-          this.partitionPath = value.toString();
-          break;
-        }
-        case 4: {
-          this.fileName = value.toString();
-          break;
-        }
-        default: throw new IllegalArgumentException("Not expected");
+  public void update(int ordinal, Object value) {
+    if (ordinal < metaFields.length) {
+      if (value instanceof UTF8String) {
+        metaFields[ordinal] = (UTF8String) value;
+      } else if (value instanceof String) {
+        metaFields[ordinal] = UTF8String.fromString((String) value);
+      } else {
+        throw new IllegalArgumentException(
+            String.format("Could not update the row at (%d) with value of type 
(%s), either UTF8String or String are expected", ordinal, 
value.getClass().getSimpleName()));
       }
     } else {
-      row.update(i, value);
+      row.update(rebaseOrdinal(ordinal), value);
     }
   }
 
-  private String getMetaColumnVal(int ordinal) {
-    switch (ordinal) {
-      case 0: {
-        return commitTime;
-      }
-      case 1: {
-        return commitSeqNumber;
-      }
-      case 2: {
-        return recordKey;
-      }
-      case 3: {
-        return partitionPath;
-      }
-      case 4: {
-        return fileName;
-      }
-      default: throw new IllegalArgumentException("Not expected");
+  @Override
+  public boolean isNullAt(int ordinal) {
+    if (ordinal < metaFields.length) {
+      return metaFields[ordinal] == null;
     }
+    return row.isNullAt(rebaseOrdinal(ordinal));
   }
 
   @Override
-  public boolean isNullAt(int ordinal) {
+  public UTF8String getUTF8String(int ordinal) {
+    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+      return metaFields[ordinal];
+    }
+    return row.getUTF8String(rebaseOrdinal(ordinal));
+  }
+
+  @Override
+  public Object get(int ordinal, DataType dataType) {
     if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return null == getMetaColumnVal(ordinal);
+      validateMetaFieldDataType(dataType);
+      return metaFields[ordinal];
     }
-    return row.isNullAt(ordinal);
+    return row.get(rebaseOrdinal(ordinal), dataType);
   }
 
   @Override
   public boolean getBoolean(int ordinal) {
-    return row.getBoolean(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Boolean.class);
+    return row.getBoolean(rebaseOrdinal(ordinal));
   }
 
   @Override
   public byte getByte(int ordinal) {
-    return row.getByte(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Byte.class);
+    return row.getByte(rebaseOrdinal(ordinal));
   }
 
   @Override
   public short getShort(int ordinal) {
-    return row.getShort(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Short.class);
+    return row.getShort(rebaseOrdinal(ordinal));
   }
 
   @Override
   public int getInt(int ordinal) {
-    return row.getInt(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Integer.class);
+    return row.getInt(rebaseOrdinal(ordinal));
   }
 
   @Override
   public long getLong(int ordinal) {
-    return row.getLong(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Long.class);
+    return row.getLong(rebaseOrdinal(ordinal));
   }
 
   @Override
   public float getFloat(int ordinal) {
-    return row.getFloat(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Float.class);
+    return row.getFloat(rebaseOrdinal(ordinal));
   }
 
   @Override
   public double getDouble(int ordinal) {
-    return row.getDouble(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Double.class);
+    return row.getDouble(rebaseOrdinal(ordinal));
   }
 
   @Override
   public Decimal getDecimal(int ordinal, int precision, int scale) {
-    return row.getDecimal(ordinal, precision, scale);
-  }
-
-  @Override
-  public UTF8String getUTF8String(int ordinal) {
-    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
-    }
-    return row.getUTF8String(ordinal);
-  }
-
-  @Override
-  public String getString(int ordinal) {
-    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return new String(getMetaColumnVal(ordinal).getBytes());
-    }
-    return row.getString(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Decimal.class);
+    return row.getDecimal(rebaseOrdinal(ordinal), precision, scale);
   }
 
   @Override
   public byte[] getBinary(int ordinal) {
-    return row.getBinary(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Byte[].class);
+    return row.getBinary(rebaseOrdinal(ordinal));
   }
 
   @Override
   public CalendarInterval getInterval(int ordinal) {
-    return row.getInterval(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, CalendarInterval.class);
+    return row.getInterval(rebaseOrdinal(ordinal));
   }
 
   @Override
   public InternalRow getStruct(int ordinal, int numFields) {
-    return row.getStruct(ordinal, numFields);
+    ruleOutMetaFieldsAccess(ordinal, InternalRow.class);
+    return row.getStruct(rebaseOrdinal(ordinal), numFields);
   }
 
   @Override
   public ArrayData getArray(int ordinal) {
-    return row.getArray(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, ArrayData.class);
+    return row.getArray(rebaseOrdinal(ordinal));
   }
 
   @Override
   public MapData getMap(int ordinal) {
-    return row.getMap(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, MapData.class);
+    return row.getMap(rebaseOrdinal(ordinal));
   }
 
   @Override
-  public Object get(int ordinal, DataType dataType) {
-    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
+  public InternalRow copy() {
+    return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length), 
row.copy(), containsMetaFields);
+  }
+
+  private int rebaseOrdinal(int ordinal) {
+    // NOTE: In cases when source row does not contain meta fields, we will 
have to
+    //       rebase ordinal onto its indexes
+    return containsMetaFields ? ordinal : ordinal - metaFields.length;
+  }
+
+  private void validateMetaFieldDataType(DataType dataType) {
+    if (!dataType.sameType(StringType$.MODULE$)) {
+      throw new ClassCastException(String.format("Can not cast meta-field of 
type UTF8String to %s", dataType.simpleString()));
     }
-    return row.get(ordinal, dataType);
   }
 
-  @Override
-  public InternalRow copy() {
-    return new HoodieInternalRow(commitTime, commitSeqNumber, recordKey, 
partitionPath, fileName, row.copy());
+  private void ruleOutMetaFieldsAccess(int ordinal, Class<?> expectedDataType) 
{

Review Comment:
   It's a good call out, we should be vigilant to make sure we're not tripping 
into easily avoidable perf traps. In this case though, i don't think this 
method will have much impact -- it's a single conditional w/ simple check that 
will be inlined be the JVM (it's cost won't be much). Later on if we see it 
showing up in the profiles, we can follow-up and make validations optional but 
out the gate i wouldn't try to optimize it out since it's impact won't be 
material.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieUnsafeRowUtils {
+
+  /**
+   * Fetches (nested) value w/in provided [[Row]] uniquely identified by the 
provided nested-field path
+   * previously composed by [[composeNestedFieldPath]]
+   */
+  def getNestedRowValue(row: Row, nestedFieldPath: NestedFieldPath): Any = {

Review Comment:
   This is a brand new code



##########
hudi-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java:
##########
@@ -52,7 +52,7 @@ protected KeyGenerator(TypedProperties config) {
    * @return list of field names, when concatenated make up the record key.
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  public List<String> getRecordKeyFieldNames() {

Review Comment:
   Good call, i think we can avoid making this breaking change



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/unsafe/UTF8StringBuilder.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.unsafe;
+
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A helper class to write {@link UTF8String}s to an internal buffer and build 
the concatenated
+ * {@link UTF8String} at the end.
+ */
+public class UTF8StringBuilder {
+
+  private static final int ARRAY_MAX = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
+
+  private byte[] buffer;
+  private int cursor = Platform.BYTE_ARRAY_OFFSET;
+
+  public UTF8StringBuilder() {

Review Comment:
   This is cloned from Spark



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java:
##########
@@ -46,16 +47,27 @@
  * field in the partition path, use field1:simple 3. If you want your table to 
be non partitioned, simply leave it as blank.
  *
  * RecordKey is internally generated using either SimpleKeyGenerator or 
ComplexKeyGenerator.
+ *
+ * @deprecated
  */
+@Deprecated

Review Comment:
   Good call! I think initially i've marked it for deprecation purely from the 
standpoint that it's implementation isn't really viable (it initializes new 
key-gen for every call to getRecordKey/getPartitionPath), but didn't offer 
suggestion to replace it. Will address



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieUnsafeRowUtils {
+
+  /**
+   * Fetches (nested) value w/in provided [[Row]] uniquely identified by the 
provided nested-field path
+   * previously composed by [[composeNestedFieldPath]]
+   */
+  def getNestedRowValue(row: Row, nestedFieldPath: NestedFieldPath): Any = {
+    var curRow = row
+    for (idx <- nestedFieldPath.parts.indices) {
+      val (ord, f) = nestedFieldPath.parts(idx)
+      if (curRow.isNullAt(ord)) {
+        // scalastyle:off return
+        if (f.nullable) return null

Review Comment:
   Structurally they are very similar, goal here was to:
   
   1. Clean up the code, make it more concise (it just needs to do a simple 
iteration)
   2. Converted to Scala to avoid cumbersome Java <> Scala interactions when 
working w/ `Row`s



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java:
##########
@@ -157,7 +152,29 @@ public List<HoodieInternalWriteStatus> getWriteStatuses() 
throws IOException {
     return writeStatusList;
   }
 
-  public void abort() {
+  public void abort() {}
+
+  public void close() throws IOException {
+    for (HoodieRowCreateHandle rowCreateHandle : handles.values()) {
+      writeStatusList.add(rowCreateHandle.close());
+    }
+    handles.clear();
+    handle = null;
+  }
+
+  private UTF8String extractPartitionPath(InternalRow row) {
+    if (populateMetaFields) {
+      // In case meta-fields are materialized w/in the table itself, we can 
just simply extract
+      // partition path from there
+      //
+      // NOTE: Helper keeps track of [[lastKnownPartitionPath]] as 
[[UTF8String]] to avoid
+      //       conversion from Catalyst internal representation into a 
[[String]]
+      return row.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_POS);
+    } else if (keyGeneratorOpt.isPresent()) {
+      return keyGeneratorOpt.get().getPartitionPath(row, structType);

Review Comment:
   Didn't miss it, it's actually one of the primary premises of this PR to get 
red of such special handling: special handling duplicates the logic across many 
places (key-gen, bulk-insert handler, etc), which surely enough leads to bugs 
induced by code getting out of sync (for ex, broken hive-style 
partitioning/encoding in bulk-insert path, etc).
   
   Keys/partition-path should be produced by a single SoT which in this case is 
the KG, and it should be done in a performant (ie engine-specific) way, and 
that's exactly what this PR tackles.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/unsafe/UTF8StringBuilder.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.unsafe;
+
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A helper class to write {@link UTF8String}s to an internal buffer and build 
the concatenated
+ * {@link UTF8String} at the end.
+ */
+public class UTF8StringBuilder {
+
+  private static final int ARRAY_MAX = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
+
+  private byte[] buffer;
+  private int cursor = Platform.BYTE_ARRAY_OFFSET;
+
+  public UTF8StringBuilder() {

Review Comment:
   And it's covered by existing tests, though we don't have dedicated UTs for it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to